# encoding: utf-8

require "eventmachine"
require "amq/client"
require "amq/client/framing/string/frame"

module AMQ
  module Client
    class EventMachineClient < EM::Connection

      #
      # Behaviors
      #

      include AMQ::Client::Adapter


      #
      # API
      #

      def self.connect(settings = {}, &block)
        @settings = Settings.configure(settings)

        instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
        instance.register_connection_callback(&block)

        instance
      end

      # Reconnect after a period of wait.
      #
      # @param [Fixnum]  period Period of time, in seconds, to wait before reconnection attempt.
      # @param [Boolean] force  If true, enforces immediate reconnection.
      # @api public
      def reconnect(force = false, period = 5)
        if @reconnecting and not force
          EventMachine::Timer.new(period) {
            reconnect(true, period)
          }
          return
        end

        if !@reconnecting
          @reconnecting = true

          self.handle_connection_interruption
          self.reset
        end

        EventMachine.reconnect(@settings[:host], @settings[:port], self)
      end





      # Defines a callback that will be executed when AMQP connection is considered open:
      # client and broker has agreed on max channel identifier and maximum allowed frame
      # size and authentication succeeds. You can define more than one callback.
      #
      # @see on_possible_authentication_failure
      # @api public
      def on_open(&block)
        @connection_deferrable.callback(&block)
      end # on_open(&block)
      alias on_connection on_open

      # Defines a callback that will be run when broker confirms connection termination
      # (client receives connection.close-ok). You can define more than one callback.
      #
      # @api public
      def on_closed(&block)
        @disconnection_deferrable.callback(&block)
      end # on_closed(&block)
      alias on_disconnection on_closed

      # Defines a callback that will be run when initial TCP connection fails.
      # You can define only one callback.
      #
      # @api public
      def on_tcp_connection_failure(&block)
        @on_tcp_connection_failure = block
      end

      # Defines a callback that will be run when initial TCP connection fails.
      # You can define only one callback.
      #
      # @api public
      def on_tcp_connection_loss(&block)
        @on_tcp_connection_loss = block
      end

      # Defines a callback that will be run when TCP connection is closed before authentication
      # finishes. Usually this means authentication failure. You can define only one callback.
      #
      # @api public
      def on_possible_authentication_failure(&block)
        @on_possible_authentication_failure = block
      end

      # @see #on_open
      # @private
      def register_connection_callback(&block)
        unless block.nil?
          # delay calling block we were given till after we receive
          # connection.open-ok. Connection will notify us when
          # that happens.
          self.on_open do
            block.call(self)
          end
        end
      end




      def initialize(*args)
        super(*args)

        self.logger   = self.class.logger

        @frames            = Array.new
        @channels          = Hash.new

        opening!

        # track TCP connection state, used to detect initial TCP connection failures.
        @tcp_connection_established       = false
        @tcp_connection_failed            = false
        @intentionally_closing_connection = false

        # EventMachine::Connection's and Adapter's constructors arity
        # make it easier to use *args. MK.
        @settings                           = Settings.configure(args.first)
        @on_tcp_connection_failure          = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
          raise self.class.tcp_connection_failure_exception_class.new(settings)
        }
        @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
          raise self.class.authentication_failure_exception_class.new(settings)
        }

        @mechanism         = "PLAIN"
        @locale            = @settings.fetch(:locale, "en_GB")
        @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

        self.reset
        self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)

        if self.heartbeat_interval > 0
          @last_server_heartbeat = Time.now
          EventMachine.add_periodic_timer(self.heartbeat_interval, &method(:send_heartbeat))
        end
      end # initialize(*args)



      # For EventMachine adapter, this is a no-op.
      # @api public
      def establish_connection(settings)
        # Unfortunately there doesn't seem to be any sane way
        # how to get EventMachine connect to the instance level.
      end

      alias close disconnect



      # Whether we are in authentication state (after TCP connection was estabilished
      # but before broker authenticated us).
      #
      # @return [Boolean]
      # @api public
      def authenticating?
        @authenticating
      end # authenticating?

      # IS TCP connection estabilished and currently active?
      # @return [Boolean]
      # @api public
      def tcp_connection_established?
        @tcp_connection_established
      end # tcp_connection_established?






      #
      # Implementation
      #

      # Backwards compatibility with 0.7.0.a25. MK.
      Deferrable = EventMachine::DefaultDeferrable


      alias send_raw send_data


      # EventMachine reactor callback. Is run when TCP connection is estabilished
      # but before resumption of the network loop. Note that this includes cases
      # when TCP connection has failed.
      # @private
      def post_init
        reset

        # note that upgrading to TLS in #connection_completed causes
        # Erlang SSL app that RabbitMQ relies on to report
        # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"}
        # and close TCP connection down. Investigation of this issue is likely
        # to take some time and to not be worth in as long as #post_init
        # works fine. MK.
        upgrade_to_tls_if_necessary
      rescue Exception => error
        raise error
      end # post_init



      # Called by EventMachine reactor once TCP connection is successfully estabilished.
      # @private
      def connection_completed
        # we only can safely set this value here because EventMachine is a lovely piece of
        # software that calls #post_init before #unbind even when TCP connection
        # fails. MK.
        @tcp_connection_established       = true
        # again, this is because #unbind is called in different situations
        # and there is no easy way to tell initial connection failure
        # from connection loss. Not in EventMachine 0.12.x, anyway. MK.
        @had_successfull_connected_before = true

        @reconnecting                     = false

        self.handshake
      end

      # @private
      def close_connection(*args)
        @intentionally_closing_connection = true

        super(*args)
      end

      # Called by EventMachine reactor when
      #
      # * We close TCP connection down
      # * Our peer closes TCP connection down
      # * There is a network connection issue
      # * Initial TCP connection fails
      # @private
      def unbind(exception = nil)
        if !@tcp_connection_established && !@had_successfull_connected_before && !@intentionally_closing_connection
          @tcp_connection_failed = true
          self.tcp_connection_failed
        end

        closing!
        @tcp_connection_established = false

        self.handle_connection_interruption
        @disconnection_deferrable.succeed

        closed!


        self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfull_connected_before

        # since AMQP spec dictates that authentication failure is a protocol exception
        # and protocol exceptions result in connection closure, check whether we are
        # in the authentication stage. If so, it is likely to signal an authentication
        # issue. Java client behaves the same way. MK.
        if authenticating? && !@intentionally_closing_connection
          @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
        end
      end # unbind


      #
      # EventMachine receives data in chunks, sometimes those chunks are smaller
      # than the size of AMQP frame. That's why you need to add some kind of buffer.
      #
      # @private
      def receive_data(chunk)
        @chunk_buffer << chunk
        while frame = get_next_frame
          self.receive_frame(AMQ::Client::Framing::String::Frame.decode(frame))
        end
      end


      # Called by AMQ::Client::Connection after we receive connection.open-ok.
      # @api public
      def connection_successful
        @authenticating = false
        opened!

        @connection_deferrable.succeed
      end # connection_successful


      # Called by AMQ::Client::Connection after we receive connection.close-ok.
      #
      # @api public
      def disconnection_successful
        @disconnection_deferrable.succeed

        # true for "after writing buffered data"
        self.close_connection(true)
        self.reset
        closed!
      end # disconnection_successful





      self.handle(Protocol::Connection::Start) do |connection, frame|
        connection.start_ok(frame.decode_payload)
      end

      self.handle(Protocol::Connection::Tune) do |connection, frame|
        connection.handle_tune(frame.decode_payload)

        connection.open(connection.vhost)
      end

      self.handle(Protocol::Connection::OpenOk) do |connection, frame|
        connection.handle_open_ok(frame.decode_payload)
      end

      self.handle(Protocol::Connection::Close) do |connection, frame|
        connection.handle_close(frame.decode_payload)
      end

      self.handle(Protocol::Connection::CloseOk) do |connection, frame|
        connection.handle_close_ok(frame.decode_payload)
      end




      protected


      def reset
        @size    = 0
        @payload = ""
        @frames  = Array.new

        @chunk_buffer                 = ""
        @connection_deferrable        = EventMachine::DefaultDeferrable.new
        @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

        # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
        # that on authentication failure broker must close TCP connection without sending
        # any more data. This is why we need to explicitly track whether we are past
        # authentication stage to signal possible authentication failures.
        @authenticating           = false
      end

      def upgrade_to_tls_if_necessary
        tls_options = @settings[:ssl]

        if tls_options.is_a?(Hash)
          start_tls(tls_options)
        elsif tls_options
          start_tls
        end
      end # upgrade_to_tls_if_necessary
    end # EventMachineClient
  end # Client
end # AMQ