require 'amqp/frame'

module AMQP
  class Error < StandardError; end

  module BasicClient
    def process_frame frame
      if mq = channels[frame.channel]
        mq.process_frame(frame)
        return
      end

      case frame
      when Frame::Method
        case method = frame.payload
        when Protocol::Connection::Start
          send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
                                                  :product => 'AMQP',
                                                  :information => 'http://github.com/tmm1/amqp',
                                                  :version => VERSION},
                                                 'AMQPLAIN',
                                                 {:LOGIN => @settings[:user],
                                                  :PASSWORD => @settings[:pass]},
                                                 'en_US')

        when Protocol::Connection::Tune
          send Protocol::Connection::TuneOk.new(:channel_max => 0,
                                                :frame_max => 131072,
                                                :heartbeat => 0)

          send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
                                              :capabilities => '',
                                              :insist => @settings[:insist])

        when Protocol::Connection::OpenOk
          succeed(self)

        when Protocol::Connection::Redirect
          (redir_host, redir_port) = method.host.split(":")
          redir_port = redir_port.to_i #important!
          STDERR.puts "Redirecting to #{redir_host}:#{redir_port}"
          old_on_disconnect = @on_disconnect
          @on_disconnect = Proc.new do
            @on_disconnect = old_on_disconnect
            reconnect true, redir_host, redir_port
          end
          
        when Protocol::Connection::Close
          # raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
          STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"

        when Protocol::Connection::CloseOk
          @on_disconnect.call if @on_disconnect
        end
      end
    end
  end

  def self.client
    @client ||= BasicClient
  end

  def self.client= mod
    mod.__send__ :include, AMQP
    @client = mod
  end

  module Client
    include EM::Deferrable

    def initialize opts = {}
      @settings = opts
      extend AMQP.client

      @on_disconnect ||= method(:disconnected)

      timeout @settings[:timeout] if @settings[:timeout]
      errback{ @on_disconnect.call } unless @reconnecting

      @connected = false
    end

    def connection_completed
      start_tls if @settings[:ssl]
      log 'connected'
      # @on_disconnect = proc{ raise Error, 'Disconnected from server' }
      unless @closing
        Client.connection_succeeded!
        @on_disconnect = method(:disconnected)
      end

      @connected = true
      @connection_status.call(:connected) if @connection_status

      @buf = Buffer.new
      send_data HEADER
      send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
    end

    def connected?
      @connected
    end

    def unbind
      log 'disconnected'
      @connected = false
      EM.next_tick{ @on_disconnect.call }
    end

    def add_channel mq
      (@_channel_mutex ||= Mutex.new).synchronize do
        channels[ key = (channels.keys.max || 0) + 1 ] = mq
        key
      end
    end

    def channels
      @channels ||= {}
    end

    def receive_data data
      # log 'receive_data', data
      @buf << data

      while frame = Frame.parse(@buf)
        log 'receive', frame
        process_frame frame
      end
    end

    def process_frame frame
      # this is a stub meant to be
      # replaced by the module passed into initialize
    end

    def send data, opts = {}
      channel = opts[:channel] ||= 0
      data = data.to_frame(channel) unless data.is_a? Frame
      data.channel = channel

      log 'send', data
      send_data data.to_s
    end

    #:stopdoc:
    # def send_data data
    #   log 'send_data', data
    #   super
    # end
    #:startdoc:

    def close &on_disconnect
      if on_disconnect
        @closing = true
        @on_disconnect = proc{
          on_disconnect.call
          @closing = false
        }
      end

      callback{ |c|
        if c.channels.any?
          c.channels.each do |ch, mq|
            mq.close
          end
        else
          send Protocol::Connection::Close.new(:reply_code => 200,
                                               :reply_text => 'Goodbye',
                                               :class_id => 0,
                                               :method_id => 0)
        end
      }
    end

    def reconnect force = false, use_host = nil, use_port = nil
      if @reconnecting and not force
        # wait 1 second after first reconnect attempt, in between each subsequent attempt
        EM.add_timer(@settings[:reconnect_timer] || 1){ reconnect(true) }
        return
      end

      unless @reconnecting
        @reconnecting = true

        @deferred_status = nil
        initialize(@settings)

        mqs = @channels
        @channels = {}
        mqs.each{ |_,mq| mq.reset } if mqs
      end

      log 'reconnecting'
      begin
        if use_host && use_port
          (try_host, try_port) = [use_host, use_port]
        else
          (try_host, try_port) = Client.determine_reconnect_server(@settings)
          STDERR.puts "Reconnecting to #{try_host}:#{try_port}"          
        end
        EM.reconnect try_host, try_port, self
      rescue RuntimeError => e
        STDERR.puts "'#{e.message}' on reconnect to #{try_host}:#{try_port}"
        return reconnect if e.message == "no connection"
        raise e
      end
    end
    
    def self.determine_reconnect_server(opts)
      try_host = opts[:host]
      try_port = opts[:port]
      @retry_count ||= 0
      if opts[:max_retry] && @retry_count >= opts[:max_retry]
        raise "max_retry (#{@retry_count}) reached, disconnecting"
      end
      if srv_list = opts[:fallback_servers]
        @server_to_select ||= 0
        idex = @server_to_select % (srv_list.size + 1)
        if idex != 0
          try = srv_list[idex - 1]
          try_host = try[:host] || AMQP.settings[:host]
          try_port = try[:port] || AMQP.settings[:port]
        end
        @server_to_select += 1
      end      
      @retry_count += 1
      [try_host, try_port]
    end
    
    def self.connection_succeeded!
      @retry_count = 0
    end

    def self.connect opts = {}
      opts = AMQP.settings.merge(opts)
      max_retry = ((opts[:fallback_servers] || []).size + 1) * 3
      begin
        (try_host, try_port) = determine_reconnect_server(opts)
        EM.connect try_host, try_port, self, opts
        connection_succeeded!
      rescue RuntimeError => e
        STDERR.puts "'#{e.message}' on connect to #{try_host}:#{try_port}"
        retry if e.message == "no connection" && @retry_count < max_retry
        raise e
      end
    end

    def connection_status &blk
      @connection_status = blk
    end

    private

    def disconnected
      @connection_status.call(:disconnected) if @connection_status
      reconnect
    end

    def log *args
      return unless @settings[:logging] or AMQP.logging
      require 'pp'
      pp args
      puts
    end
  end
end