# encoding: utf-8

require "amq/client/logging"
require "amq/client/settings"
require "amq/client/entity"
require "amq/client/channel"

module AMQ
  # For overview of AMQP client adapters API, see {AMQ::Client::Adapter}
  module Client


    # Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or
    # sockets-based) subclass it and must implement Adapter API methods:
    #
    # * #send_raw(data)
    # * #estabilish_connection(settings)
    # * #close_connection
    #
    # @abstract
    module Adapter

      def self.included(host)
        host.extend ClassMethods
        host.extend ProtocolMethodHandlers

        host.class_eval do

          #
          # Behaviors
          #

          include Entity



          #
          # API
          #

          attr_accessor :logger
          attr_accessor :settings
          attr_accessor :connection

          # The locale defines the language in which the server will send reply texts.
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
          attr_accessor :locale

          # Client capabilities
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2.1)
          attr_accessor :client_properties

          # Server capabilities
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
          attr_reader :server_properties

          # Authentication mechanism used.
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.2)
          attr_reader :mechanism

          # Channels within this connection.
          #
          # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
          attr_reader :channels

          # Maximum channel number that the server permits this connection to use.
          # Usable channel numbers are in the range 1..channel_max.
          # Zero indicates no specified limit.
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.1 and 1.4.2.6.1)
          attr_accessor :channel_max

          # Maximum frame size that the server permits this connection to use.
          #
          # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Sections 1.4.2.5.2 and 1.4.2.6.2)
          attr_accessor :frame_max


          attr_reader :known_hosts



          # @api plugin
          # @see #disconnect
          # @note Adapters must implement this method but it is NOT supposed to be used directly.
          #       AMQ protocol defines two-step process of closing connection (send Connection.Close
          #       to the peer and wait for Connection.Close-Ok), implemented by {Adapter#disconnect}
          def close_connection
            raise MissingInterfaceMethodError.new("AMQ::Client.close_connection")
          end unless defined?(:close_connection) # since it is a module, this method may already be defined
        end
      end # self.included(host)



      module ClassMethods
        # Settings
        def settings
          @settings ||= AMQ::Client::Settings.default
        end

        def logger
          @logger ||= begin
                        require "logger"
                        Logger.new(STDERR)
                      end
        end

        def logger=(logger)
          methods = AMQ::Client::Logging::REQUIRED_METHODS
          unless methods.all? { |method| logger.respond_to?(method) }
            raise AMQ::Client::Logging::IncompatibleLoggerError.new(methods)
          end

          @logger = logger
        end

        # @return [Boolean] Current value of logging flag.
        def logging
          settings[:logging]
        end

        # Turns loggin on or off.
        def logging=(boolean)
          settings[:logging] = boolean
        end


        # Establishes connection to AMQ broker and returns it. New connection object is yielded to
        # the block if it is given.
        #
        # @example Specifying adapter via the :adapter option
        #   AMQ::Client::Adapter.connect(:adapter => "socket")
        # @example Specifying using custom adapter class
        #   AMQ::Client::SocketClient.connect
        # @param [Hash] Connection parameters, including :adapter to use.
        # @api public
        def connect(settings = nil, &block)
          @settings = Settings.configure(settings)

          instance = self.new
          instance.establish_connection(settings)
          instance.register_connection_callback(&block)

          instance
        end


        # Can be overriden by higher-level libraries like amqp gem or bunny.
        # Defaults to AMQ::Client::TCPConnectionFailed.
        #
        # @return [Class]
        def tcp_connection_failure_exception_class
          @tcp_connection_failure_exception_class ||= AMQ::Client::TCPConnectionFailed
        end # tcp_connection_failure_exception_class

        # Can be overriden by higher-level libraries like amqp gem or bunny.
        # Defaults to AMQ::Client::PossibleAuthenticationFailure.
        #
        # @return [Class]
        def authentication_failure_exception_class
          @authentication_failure_exception_class ||= AMQ::Client::PossibleAuthenticationFailureError
        end # authentication_failure_exception_class
      end # ClassMethods


      #
      # Behaviors
      #

      include AMQ::Client::Openable

      extend RegisterEntityMixin

      register_entity :channel,  AMQ::Client::Channel


      #
      # API
      #


      # Establish socket connection to the server.
      #
      # @api plugin
      def establish_connection(settings)
        raise MissingInterfaceMethodError.new("AMQ::Client#establish_connection(settings)")
      end

      # Properly close connection with AMQ broker, as described in
      # section 2.2.4 of the {http://bit.ly/hw2ELX AMQP 0.9.1 specification}.
      #
      # @api  plugin
      # @see  #close_connection
      def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block)
        @intentionally_closing_connection = true
        self.on_disconnection(&block)

        # ruby-amqp/amqp#66, MK.
        if self.open?
          closing!
          self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id))
        elsif self.closing?
          # no-op
        else
          self.disconnection_successful
        end
      end


      # Sends AMQ protocol header (also known as preamble).
      #
      # @note This must be implemented by all AMQP clients.
      # @api plugin
      # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2)
      def send_preamble
        self.send_raw(AMQ::Protocol::PREAMBLE)
      end

      # Sends frame to the peer, checking that connection is open.
      #
      # @raise [ConnectionClosedError]
      def send_frame(frame)
        if closed?
          raise ConnectionClosedError.new(frame)
        else
          self.send_raw(frame.encode)
        end
      end

      # Sends multiple frames, one by one.
      #
      # @api public
      def send_frameset(frames)
        frames.each { |frame| self.send_frame(frame) }
      end # send_frameset(frames)



      # Returns heartbeat interval this client uses, in seconds.
      # This value may or may not be used depending on broker capabilities.
      # Zero means the server does not want a heartbeat.
      #
      # @return  [Fixnum]  Heartbeat interval this client uses, in seconds.
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
      def heartbeat_interval
        @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
      end # heartbeat_interval


      # vhost this connection uses. Default is "/", a historically estabilished convention
      # of RabbitMQ and amqp gem.
      #
      # @return [String] vhost this connection uses
      # @api public
      def vhost
        @settings.fetch(:vhost, "/")
      end # vhost


      # Called when previously established TCP connection fails.
      # @api public
      def tcp_connection_lost
        @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
      end

      # Called when initial TCP connection fails.
      # @api public
      def tcp_connection_failed
        @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
      end



      #
      # Implementation
      #


      # Sends opaque data to AMQ broker over active connection.
      #
      # @note This must be implemented by all AMQP clients.
      # @api plugin
      def send_raw(data)
        raise MissingInterfaceMethodError.new("AMQ::Client#send_raw(data)")
      end

      # Sends connection preamble to the broker.
      # @api plugin
      def handshake
        @authenticating = true
        self.send_preamble
      end


      # Sends connection.open to the server.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.7)
      def open(vhost = "/")
        self.send_frame(Protocol::Connection::Open.encode(vhost))
      end

      # Resets connection state.
      #
      # @api plugin
      def reset_state!
        # no-op by default
      end # reset_state!

      # @api plugin
      # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595
      def encode_credentials(username, password)
        "\0#{username}\0#{password}"
      end # encode_credentials(username, password)


      # Processes a single frame.
      #
      # @param [AMQ::Protocol::Frame] frame
      # @api plugin
      def receive_frame(frame)
        @frames << frame
        if frameset_complete?(@frames)
          receive_frameset(@frames)
          @frames.clear
        else
          # puts "#{frame.inspect} is NOT final"
        end
      end

      # Processes a frameset by finding and invoking a suitable handler.
      # Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat
      # value.
      #
      # @param [Array<AMQ::Protocol::Frame>] frames
      # @api plugin
      def receive_frameset(frames)
        frame = frames.first

        if Protocol::HeartbeatFrame === frame
          @last_server_heartbeat = Time.now
        else
          if callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
            callable.call(self, frames.first, frames[1..-1])
          else
            raise MissingHandlerError.new(frames.first)
          end
        end
      end

      # Sends a heartbeat frame if connection is open.
      # @api plugin
      def send_heartbeat
        if tcp_connection_established?
          if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2))
            logger.error "Reconnecting due to missing server heartbeats"
            # TODO: reconnect
          end
          send_frame(Protocol::HeartbeatFrame)
        end
      end # send_heartbeat



      # Handles connection.start.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.1.)
      def handle_start(connection_start)
        @server_properties = connection_start.server_properties

        username = @settings[:user] || @settings[:username]
        password = @settings[:pass] || @settings[:password]

        # It's not clear whether we should transition to :opening state here
        # or in #open but in case authentication fails, it would be strange to have
        # @status undefined. So lets do this. MK.
        opening!

        self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
      end


      # Handles Connection.Tune-Ok.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.6)
      def handle_tune(tune_ok)
        @channel_max        = tune_ok.channel_max
        @frame_max          = tune_ok.frame_max
        @heartbeat_interval = self.heartbeat_interval || tune_ok.heartbeat

        self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
      end # handle_tune(method)


      # Handles Connection.Open-Ok.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.8.)
      def handle_open_ok(open_ok)
        @known_hosts = open_ok.known_hosts

        opened!
        self.connection_successful if self.respond_to?(:connection_successful)
      end


      # Handles connection.close. When broker detects a connection level exception, this method is called.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.9)
      def handle_close(conn_close)
        self.handle_connection_interruption

        closed!
        # TODO: use proper exception class, provide protocol class (we know conn_close.class_id and conn_close.method_id) as well!
        error = RuntimeError.new(conn_close.reply_text)
        self.error(error)
      end


      # Handles Connection.Close-Ok.
      #
      # @api plugin
      # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.4.2.10)
      def handle_close_ok(close_ok)
        closed!
        self.disconnection_successful
      end # handle_close_ok(close_ok)

      # @api plugin
      def handle_connection_interruption
        @channels.each { |n, c| c.handle_connection_interruption }
      end # handle_connection_interruption



      protected

      # Returns next frame from buffer whenever possible
      #
      # @api private
      def get_next_frame
        return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
        # octet + short
        offset = 3 # 1 + 2
        # length
        payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
        # 4 bytes for long payload length, 1 byte final octet
        frame_length = offset + payload_length + 5
        if frame_length <= @chunk_buffer.size
          @chunk_buffer.slice!(0, frame_length)
        else
          nil
        end
      end # def get_next_frame

      # Utility methods

      # Determines, whether the received frameset is ready to be further processed
      def frameset_complete?(frames)
        return false if frames.empty?
        first_frame = frames[0]
        first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
      end

      # Determines, whether given frame array contains full content body
      def content_complete?(frames)
        return false if frames.empty?
        header = frames[0]
        raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
        header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
      end

    end # Adapter
  end # Client
end # AMQ