# encoding: utf-8

require "eventmachine"
require "amq/client"
require "amq/client/async/adapter"
require "amq/client/framing/string/frame"

module AMQ
  module Client
    module Async
      class EventMachineClient < EM::Connection

        #
        # Behaviours
        #

        include AMQ::Client::Async::Adapter


        #
        # API
        #

        # @group Connection operations

        # Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection
        # succeeds.
        #
        # @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on.
        # @option settings [String] :port (5672) Port AMQ broker listens on.
        # @option settings [String] :vhost ("/") Virtual host to use.
        # @option settings [String] :user ("guest") Username to use for authentication.
        # @option settings [String] :pass ("guest") Password to use for authentication.
        # @option settings [String] :ssl (false) Should be use TLS (SSL) for connection?
        # @option settings [String] :timeout (nil) Connection timeout.
        # @option settings [String] :logging (false) Turns logging on or off.
        # @option settings [String] :broker (nil) Broker name (use if you intend to use broker-specific features).
        # @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
        #
        # @param [Hash] settings
        def self.connect(settings = {}, &block)
          @settings = Settings.configure(settings)

          instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
          instance.register_connection_callback(&block)

          instance
        end

        # Reconnect after a period of wait.
        #
        # @param [Fixnum]  period Period of time, in seconds, to wait before reconnection attempt.
        # @param [Boolean] force  If true, enforces immediate reconnection.
        # @api public
        def reconnect(force = false, period = 5)
          if @reconnecting and not force
            EventMachine::Timer.new(period) {
              reconnect(true, period)
            }
            return
          end

          if !@reconnecting
            @reconnecting = true
            self.reset
          end

          EventMachine.reconnect(@settings[:host], @settings[:port], self)
        end

        # Similar to #reconnect, but uses different connection settings
        # @see #reconnect
        # @api public
        def reconnect_to(settings, period = 5)
          if !@reconnecting
            @reconnecting = true
            self.reset
          end

          @settings = Settings.configure(settings)
          EventMachine.reconnect(@settings[:host], @settings[:port], self)
        end


        # Periodically try to reconnect.
        #
        # @param [Fixnum]  period Period of time, in seconds, to wait before reconnection attempt.
        # @param [Boolean] force  If true, enforces immediate reconnection.
        # @api public
        def periodically_reconnect(period = 5)
          @reconnecting = true
          self.reset

          @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
            EventMachine.reconnect(@settings[:host], @settings[:port], self)
          }
        end

        # @endgroup




        # Defines a callback that will be executed when AMQP connection is considered open:
        # client and broker has agreed on max channel identifier and maximum allowed frame
        # size and authentication succeeds. You can define more than one callback.
        #
        # @see on_possible_authentication_failure
        # @api public
        def on_open(&block)
          @connection_deferrable.callback(&block)
        end # on_open(&block)
        alias on_connection on_open

        # Defines a callback that will be run when broker confirms connection termination
        # (client receives connection.close-ok). You can define more than one callback.
        #
        # @api public
        def on_closed(&block)
          @disconnection_deferrable.callback(&block)
        end # on_closed(&block)
        alias on_disconnection on_closed

        # @see #on_open
        # @private
        def register_connection_callback(&block)
          unless block.nil?
            # delay calling block we were given till after we receive
            # connection.open-ok. Connection will notify us when
            # that happens.
            self.on_open do
              block.call(self)
            end
          end
        end




        def initialize(*args)
          super(*args)

          self.logger   = self.class.logger

          @frames            = Array.new
          @channels          = Hash.new
          @callbacks         = Hash.new

          opening!

          # track TCP connection state, used to detect initial TCP connection failures.
          @tcp_connection_established       = false
          @tcp_connection_failed            = false
          @intentionally_closing_connection = false

          # EventMachine::Connection's and Adapter's constructors arity
          # make it easier to use *args. MK.
          @settings                           = Settings.configure(args.first)
          @on_tcp_connection_failure          = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
            raise self.class.tcp_connection_failure_exception_class.new(settings)
          }
          @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
            raise self.class.authentication_failure_exception_class.new(settings)
          }

          @mechanism         = "PLAIN"
          @locale            = @settings.fetch(:locale, "en_GB")
          @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

          @auto_recovery     = (!!@settings[:auto_recovery])

          self.reset
          self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)

          if self.heartbeat_interval > 0
            self.initialize_heartbeat_sender
          end
        end # initialize(*args)



        # For EventMachine adapter, this is a no-op.
        # @api public
        def establish_connection(settings)
          # Unfortunately there doesn't seem to be any sane way
          # how to get EventMachine connect to the instance level.
        end

        alias close disconnect



        # Whether we are in authentication state (after TCP connection was estabilished
        # but before broker authenticated us).
        #
        # @return [Boolean]
        # @api public
        def authenticating?
          @authenticating
        end # authenticating?

        # IS TCP connection estabilished and currently active?
        # @return [Boolean]
        # @api public
        def tcp_connection_established?
          @tcp_connection_established
        end # tcp_connection_established?




        #
        # Implementation
        #

        # Backwards compatibility with 0.7.0.a25. MK.
        Deferrable = EventMachine::DefaultDeferrable


        alias send_raw send_data


        # EventMachine reactor callback. Is run when TCP connection is estabilished
        # but before resumption of the network loop. Note that this includes cases
        # when TCP connection has failed.
        # @private
        def post_init
          reset

          # note that upgrading to TLS in #connection_completed causes
          # Erlang SSL app that RabbitMQ relies on to report
          # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"}
          # and close TCP connection down. Investigation of this issue is likely
          # to take some time and to not be worth in as long as #post_init
          # works fine. MK.
          upgrade_to_tls_if_necessary
        rescue Exception => error
          raise error
        end # post_init



        # Called by EventMachine reactor once TCP connection is successfully estabilished.
        # @private
        def connection_completed
          # we only can safely set this value here because EventMachine is a lovely piece of
          # software that calls #post_init before #unbind even when TCP connection
          # fails. MK.
          @tcp_connection_established       = true
          @periodic_reconnection_timer.cancel if @periodic_reconnection_timer


          # again, this is because #unbind is called in different situations
          # and there is no easy way to tell initial connection failure
          # from connection loss. Not in EventMachine 0.12.x, anyway. MK.

          if @had_successfully_connected_before
            @recovered = true


            self.start_automatic_recovery
            self.upgrade_to_tls_if_necessary
          end

          # now we can set it. MK.
          @had_successfully_connected_before = true
          @reconnecting                      = false
          @handling_skipped_hearbeats        = false
          @last_server_heartbeat             = Time.now

          self.initialize_heartbeat_sender if self.heartbeat_interval > 0

          self.handshake
        end

        # @private
        def close_connection(*args)
          @intentionally_closing_connection = true

          super(*args)
        end

        # Called by EventMachine reactor when
        #
        # * We close TCP connection down
        # * Our peer closes TCP connection down
        # * There is a network connection issue
        # * Initial TCP connection fails
        # @private
        def unbind(exception = nil)
          if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
            @tcp_connection_failed = true
            logger.error "[amqp] Detected TCP connection failure"
            self.tcp_connection_failed
          end

          closing!
          @tcp_connection_established = false

          self.handle_connection_interruption if @reconnecting
          @disconnection_deferrable.succeed

          closed!


          self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before

          # since AMQP spec dictates that authentication failure is a protocol exception
          # and protocol exceptions result in connection closure, check whether we are
          # in the authentication stage. If so, it is likely to signal an authentication
          # issue. Java client behaves the same way. MK.
          if authenticating? && !@intentionally_closing_connection
            @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
          end
        end # unbind


        #
        # EventMachine receives data in chunks, sometimes those chunks are smaller
        # than the size of AMQP frame. That's why you need to add some kind of buffer.
        #
        # @private
        def receive_data(chunk)
          @chunk_buffer << chunk
          while frame = get_next_frame
            self.receive_frame(AMQ::Client::Framing::String::Frame.decode(frame))
          end
        end


        # Called by AMQ::Client::Connection after we receive connection.open-ok.
        # @api public
        def connection_successful
          @authenticating = false
          opened!

          @connection_deferrable.succeed
        end # connection_successful


        # Called by AMQ::Client::Connection after we receive connection.close-ok.
        #
        # @api public
        def disconnection_successful
          @disconnection_deferrable.succeed

          # true for "after writing buffered data"
          self.close_connection(true)
          self.reset
          closed!
        end # disconnection_successful

        # Called when time since last server heartbeat received is greater or equal to the
        # heartbeat interval set via :heartbeat_interval option on connection.
        #
        # @api plugin
        def handle_skipped_hearbeats
          if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
            @handling_skipped_hearbeats = true
            @heartbeats_timer.cancel

            self.run_skipped_heartbeats_callbacks
          end
        end

        # @private
        def initialize_heartbeat_sender
          @last_server_heartbeat = Time.now
          @heartbeats_timer      = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
        end



        self.handle(Protocol::Connection::Start) do |connection, frame|
          connection.handle_start(frame.decode_payload)
        end

        self.handle(Protocol::Connection::Tune) do |connection, frame|
          connection.handle_tune(frame.decode_payload)

          connection.open(connection.vhost)
        end

        self.handle(Protocol::Connection::OpenOk) do |connection, frame|
          connection.handle_open_ok(frame.decode_payload)
        end

        self.handle(Protocol::Connection::Close) do |connection, frame|
          connection.handle_close(frame.decode_payload)
        end

        self.handle(Protocol::Connection::CloseOk) do |connection, frame|
          connection.handle_close_ok(frame.decode_payload)
        end




        protected


        def reset
          @size      = 0
          @payload   = ""
          @frames    = Array.new

          @chunk_buffer                 = ""
          @connection_deferrable        = EventMachine::DefaultDeferrable.new
          @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

          # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
          # that on authentication failure broker must close TCP connection without sending
          # any more data. This is why we need to explicitly track whether we are past
          # authentication stage to signal possible authentication failures.
          @authenticating           = false
        end

        def upgrade_to_tls_if_necessary
          tls_options = @settings[:ssl]

          if tls_options.is_a?(Hash)
            start_tls(tls_options)
          elsif tls_options
            start_tls
          end
        end # upgrade_to_tls_if_necessary
      end # EventMachineClient
    end # Async
  end # Client
end # AMQ