# -*- coding: utf-8 -*-
require 'tengine/mq'

require 'active_support/version'
require 'active_support/core_ext/hash/deep_merge'
require 'tengine/support/core_ext/hash/compact'
require 'tengine/support/core_ext/hash/deep_dup'
require 'tengine/support/core_ext/hash/keys'
require 'tengine/support/core_ext/enumerable/each_next_tick'
require 'tengine/support/core_ext/enumerable/deep_freeze'
require 'tengine/support/core_ext/module/private_constant'
require 'amqp'
require 'amqp/extensions/rabbitmq'

class Tengine::Mq::Suite

  #######
  private
  #######

  PendingEvent = Struct.new :tag, :sender, :event, :opts, :retry, :block
  private_constant :PendingEvent

  # This is to accumulate a set of exceptions happend at a series of executions.
  class ExceptionsContainer < RuntimeError
    def initialize
      super
      @set = Array.new
    end

    # diagnostics
    def message
      msgs = @set.map {|i| i.message }.join "\n\t"
      sprintf "multiple exceptions are reported.\n\t%s", msgs
    end

    def << e
      @set << e
    end

    def raise
      case @set.size
      when 0
        # no exceptions
      when 1
        # only one exception happened inside
        Kernel.raise @set.first
      else
        # multiple.
        super
      end
    end
  end
  private_constant :ExceptionsContainer

  # Some (not all) of the descriptions below are quoted from the AMQP gem's yardoc.
  #
  # @param                [Hash]  cfg                   Tons  of  optional  arguments  can  be  specified and  they  are  settings  for
  #                                                     connections, queues, and senders.  But they are all optional, i.e. you are 100%
  #                                                     safe to omit the whole.  Specify only what you concern.
  # @option cfg           [Hash] :sender                Configurations for message sender, see below.
  # @option cfg           [Hash] :connection            Configurations for MQ connection, see below.
  # @option cfg           [Hash] :channel               Configurations for MQ channel, see below.
  # @option cfg           [Hash] :exchange              Configurations for MQ exchange, see below.
  # @option cfg           [Hash] :queue                 Configurations for MQ queue, see below.
  # @option sender     [Boolean] :keep_connection       Whether a  connection shall be shut  down after transmitted a  message, or not.
  #                                                     If  set,  a  sender  can  eventually  shut your  reactor  down  and  the  whole
  #                                                     EventMachine loop can be abandoned.
  # @option sender     [Numeric] :retry_interval        Seconds to wait before attempting  to retransmit a message after failure.  Zero
  #                                                     means an immediate retry so watch out.
  # @option sender     [Integer] :retry_count           Max count of retry  attempts.  Set zero here to stop sender  from even think of
  #                                                     retrying.
  # @option connection  [String] :user                  Authentication info.
  # @option connection  [String] :pass                  Authentication info.
  # @option connection  [String] :host                  Where to connect.
  # @option connection  [String] :port                  Where to connect.
  # @option connection  [String] :vhost                 The AMQP virtual host.
  # @option connection [Numeric] :timeout               Connection timeout in secs.
  # @option connection [Boolean] :logging               ?? Description TBD ??
  # @option connection [Boolean] :insist                ?? Description TBD ??
  # @option connection [Numeric] :auto_reconnect_delay  When set, a  TCP session loss yields an automatic  reconnect attempt, with this
  #                                                     delay (in secs).  Without it no reconnection attempts are made.
  # @option channel    [Numeric] :prefetch              Specifies number of messages to prefetch.  Learn more: AMQP's QoS features.
  # @option channel    [Boolean] :auto_recovery         Turns on automatic network failure recovery mode for the channel.  *Note* it is
  #                                                     highly  recommended  that you  leave  this  flag  untouched (default  enabled).
  #                                                     Otherwise you  have to have 100% control  over how to recover  your channel and
  #                                                     its dependent queues/exchanges.  That should be doable via #add_hook though.
  # @option exchange    [String] :name                  Explicit  name to  use,  or  empty (i.e.  "")  to let  the  broker allocate  an
  #                                                     appropriate name.
  # @option exchange    [Symbol] :type                  One of direct, fanout, topic, or headers.
  # @option exchange      [Hash] :publish               Default options for publishing messages.  See below.
  # @option exchange   [Boolean] :passive               If  set, the  server  will  not create  the  exchange if  it  does not  already
  #                                                     exist. The  client can  use this  to check whether  an exchange  exists without
  #                                                     modifying the server state.
  # @option exchange   [Boolean] :durable               If  set  when  creating  a  new  exchange,  the  exchange  will  be  marked  as
  #                                                     durable.  Durable exchanges  and their  bindings  are recreated  upon a  server
  #                                                     restart  (information  about   them  is  persisted).   Non-durable  (transient)
  #                                                     exchanges do not  survive if/when a server restarts  (information about them is
  #                                                     stored exclusively in RAM).
  # @option exchange   [Boolean] :auto_delete           If set,  the exchange is  deleted when all  queues have finished using  it. The
  #                                                     server waits  for a  short period  of time before  determining the  exchange is
  #                                                     unused to give time to the client code to bind a queue to it.
  # @option exchange   [Boolean] :internal              If set,  the exchange  may not be  used directly  by publishers, but  only when
  #                                                     bound to other exchanges. Internal  exchanges are used to construct wiring that
  #                                                     is not visible to applications. *This is a RabbitMQ-specific extension.*
  # @option exchange   [Boolean] :nowait                If set, the server will not respond  to the method.  The client should not wait
  #                                                     for a reply method.  If the server  could not complete the method it will raise
  #                                                     a channel or connection exception.
  # @option exchange   [Boolean] :no_declare            If set,  exchange declaration command  won't be sent  to the broker.  Allows to
  #                                                     forcefully  avoid declaration.  We recommend  that only  experienced developers
  #                                                     consider this option.
  # @option exchange    [String] :default_routing_key   Default routing key that will be used by AMQP::Exchange#publish when no routing
  #                                                     key is not passed explicitly.
  # @option exchange      [Hash] :arguments             A hash of optional arguments with the declaration.  Some brokers implement AMQP
  #                                                     extensions using x-prefixed declaration arguments.
  # @option publish     [String] :routing_key           Specifies message routing key.  Routing key determines what queues messages are
  #                                                     delivered to (exact routing algorithms vary between exchange types).
  # @option publish    [Boolean] :mandatory             This flag tells  the server how to react  if the message cannot be  routed to a
  #                                                     queue. If message is mandatory,  the server will return unroutable message back
  #                                                     to the client  with basic.return AMQP method.  If message is not mandatory, the
  #                                                     server silently drops the message.
  # @option publish    [Boolean] :immediate             This flag tells  the server how to react  if the message cannot be  routed to a
  #                                                     queue consumer  immediately.  If this  flag is set,  the server will  return an
  #                                                     undeliverable message with  a Return method.  If this flag  is zero, the server
  #                                                     will queue the message, but with no guarantee that it will ever be consumed.
  # @option publish    [Boolean] :persistent            When true, this message will be persisted to disk and remain in the queue until
  #                                                     it is consumed. When  false, the message is only kept in  a transient store and
  #                                                     will lost  in case of  server restart.  When  performance and latency  are more
  #                                                     important than  durability, set  :persistent => false.   If durability  is more
  #                                                     important, set :persistent => true.
  # @option publish     [String] :content_type          Content-type of message payload.
  # @option queue       [String] :name                  Explicit  name to  use,  or  empty (i.e.  "")  to let  the  broker allocate  an
  #                                                     appropriate name.
  # @option queue         [Hash] :subscribe             Default options for subscribing.  See below.
  # @option queue      [Boolean] :passive               If set, the server will not create  the queue if it does not already exist. The
  #                                                     client can  use this to  check whether the  queue exists without  modifying the
  #                                                     server state.
  # @option queue      [Boolean] :durable               If set when creating a new queue, the queue will be marked as durable.  Durable
  #                                                     queues  remain active when  a server  restarts.  Non-durable  queues (transient
  #                                                     queues) are purged if/when a server  restarts.  Note that durable queues do not
  #                                                     necessarily hold persistent  messages, although it does not  make sense to send
  #                                                     persistent messages to a transient queue (though it is allowed).
  # @option queue      [Boolean] :exclusive             Exclusive queues may only be  consumed from by the current connection.  Setting
  #                                                     the 'exclusive'  flag always implies  'auto-delete'. Only a single  consumer is
  #                                                     allowed  to  remove  messages  from   the  queue.   The  default  is  a  shared
  #                                                     queue. Multiple clients may consume messages from the queue.
  # @option queue      [Boolean] :auto_delete           If set, the  queue is deleted when all consumers have  finished using it.  Last
  #                                                     consumer  can  be  cancelled  either  explicitly  or  because  its  channel  is
  #                                                     closed. If there was no consumer ever on the queue, it won't be deleted.
  # @option queue      [Boolean] :nowait                If set, the server will not respond  to the method.  The client should not wait
  #                                                     for a reply method.  If the server  could not complete the method it will raise
  #                                                     a channel or connection exception.
  # @option queue         [Hash] :arguments             A hash of optional arguments with the declaration.  Some brokers implement AMQP
  #                                                     extensions  using  x-prefixed  declaration  arguments.  For  example,  RabbitMQ
  #                                                     recognizes x-message-ttl declaration arguments  that defines TTL of messages in
  #                                                     the queue.
  # @option subscribe  [Boolean] :ack                   If this  field is set to false  the server does not  expect acknowledgments for
  #                                                     messages.   That is,  when a  message  is delivered  to the  client the  server
  #                                                     automatically  and silently  acknowledges it  on  behalf of  the client.   This
  #                                                     functionality increases  performance but at the cost  of reliability.  Messages
  #                                                     can get lost if a client dies before it can deliver them to the application.
  # @option subscribe   [Boolean] :nowait               If set, the server  will not respond to the method. The  client should not wait
  #                                                     for a reply method.  If the server  could not complete the method it will raise
  #                                                     a channel or connection exception.
  # @option subscribe     [#call] :confirm              If set, this  proc will be called when the server  confirms subscription to the
  #                                                     queue with  a basic.consume-ok message. Setting this  option will automatically
  #                                                     set :nowait => false. This is required for the server to send a confirmation.
  # @option subscribe   [Boolean] :exclusive            Request exclusive  consumer access, meaning  only this consumer can  access the
  #                                                     queue.   This  is  useful  when  you  want a  long-lived  shared  queue  to  be
  #                                                     temporarily  accessible by  just one  application (or  thread, or  process). If
  #                                                     application exclusive consumer  is part of crashes or  loses network connection
  #                                                     to the broker, channel is closed and exclusive consumer is thus cancelled.
  def initialize cfg          =  Hash.new
    @terminating              =  false
    @mutex                    =  Mutex.new
    @condvar                  =  ConditionVariable.new
    @setting_up               =  Hash.new
    @state                    =  :uninitialized # see setup_handshake for possible values
    @firing_queue             =  EM::Queue.new
    @publishing_events        =  Array.new
    @retrying_events          =  Hash.new
    @pending_events           =  Hash.new
    @hooks                    =  Hash.new do |h, k| h.store k, Array.new end
    @config                   =  {
      :sender                 => {
        :keep_connection      => false,
        :retry_interval       => 1,  # in seconds
        :retry_count          => 30,
      },
      :connection             => {
        :user                 => 'guest',
        :pass                 => 'guest',
        :vhost                => '/',
        :logging              => false,
        :insist               => false,
        :host                 => 'localhost',
        :port                 => 5672,
        :auto_reconnect_delay => 1, # in seconds
      },
      :channel                => {
        :prefetch             => 1,
        :auto_recovery        => true,
      },
      :exchange               => {
        :name                 => 'tengine_event_exchange',
        :type                 => :direct,
        :passive              => false,
        :durable              => true,
        :auto_delete          => false,
        :internal             => false,
        :nowait               => false,
        :publish              => {
          :content_type       => "application/json", # RFC4627
          :persistent         => true,
        },
      },
      :queue                  => {
        :name                 => 'tengine_event_queue',
        :passive              => false,
        :durable              => true,
        :auto_delete          => false,
        :exclusive            => false,
        :nowait               => false,
        :subscribe            => {
          :ack                => true,
          :nowait             => false,
          :confirm            => nil,
        },
      },
    }
    @config.deep_merge! cfg.to_hash.deep_symbolize_keys.compact
    @config.deep_freeze
    install_default_hooks
  end

  ######
  public
  ######

  attr_reader :config

  # @yield        [args]       Given block is called when the hook condition met.
  # @yieldparam  [Array] args  Any arguments passed to the callback are passed through.
  # @param      [Symbol] name  Hook name to add
  def add_hook name, &block
    raise ArgumentError, "no block given" unless block_given?
    synchronize do
      @hooks[name.intern] << block
    end
  end

  # @yield      [header, payload]             Given block is called every time a message was received by the queue.
  # @yieldparam    [AMQP::Header]  header     Message metadata.
  # @yieldparam          [String]  payload    Message entity.
  # @param                 [Hash]  cfg        Subscription options
  # @option opts        [Boolean] :ack        Whether the broker (not us) expects acknowledgements from our side.  If this is true, you
  #                                           have  to call  header.ack somewhere  inside the  block, and  unacknowledged  messages are
  #                                           re-sent later.   Otherwise you need not to  call header.ack, and the  server doesn't know
  #                                           your sudden death or packet loss or network problems.
  # @option opts        [Boolean] :nowait     With this flag  on, like other :nowait  cases, the request is dealt  silently.  Don't get
  #                                           confused: the broker do  not respond to the subscribe request, but  does push messages to
  #                                           us. i.e. this flag only affects to :confirm optional argument.
  # @option opts          [#call] :confirm    This is called when the broker replied  your subscription.  I have never seen this called
  #                                           twice.  This argument assumes :nowait => false.
  # @option opts        [Boolean] :exclusive  Request exclusive consumer access, meaning only this consumer can access the queue.  When
  #                                           you experience a  network problem, exclusive access is cancelled.  Which  itself is not a
  #                                           strange behaviour, but if you do  a auto-recover the exclusivity might suddenly lost.  So
  #                                           beware.
  def subscribe cfg = Hash.new
    raise ArgumentError, "no block given" unless block_given?
    ensures :queue do |q|
      opts = @config[:queue][:subscribe].merge cfg.compact
      q.subscribe opts do |h, b|
        yield h, b
      end
    end
  end

  # @yield                                  [cok]          Given  block is  called  after  it had  successfully  ubsubscribed from  the
  #                                                        broker.
  # @yieldparam [AMQP::Protocol::Basic::CancelOK]  cok     Message metadata.  *Note* can be nil when you set nowait: true.
  # @param                                 [Hash]  cfg     Subscription options
  # @option opts                        [Boolean] :nowait  With this flag on, like other  :nowait cases, the request is dealt silently.
  #                                                        The block is called anyway though.
  def unsubscribe cfg = Hash.new
    raise ArgumentError, "no block given" unless block_given?
    synchronize do
      if ivar? :queue and @queue.default_consumer
        cfg[:nowait] = cfg.fetch :nowait, false
        if cfg[:nowait]
          @queue.unsubscribe cfg
          yield nil
        else
          @queue.unsubscribe cfg do |cok|
            yield cok
          end
        end
      else
        logger :warn, "unsubscribe called but not subscribed"
        yield nil
      end
    end
  end

  # You don't have to understand it.  Use Tengine::Event::Sender.
  #
  # @param [Tengine::Event::Sender]  sender           Event sender
  # @param         [Tengine::Event]  event            Event to submit
  # @param                   [Hash]  opts             Options to pass to the publisher
  # @param                  [#call]  block            Callback to be triggered *after* the transmission.
  # @option opts          [Boolean] :keep_connection  Whether a connection shall be shut  down after transmitted a message, or not.  If
  #                                                   set, a  sender can eventually shut  your reactor down and  the whole EventMachine
  #                                                   loop can be abondoned.
  # @option opts          [Numeric] :retry_interval   Seconds to wait before attempting to retransmit a message after failure.
  # @option opts          [Numeric] :retry_count      Max count of retry attempts.
  def fire sender, event, opts, block
    cfg = @config[:sender].merge opts.compact
    e = PendingEvent.new 0, sender, event, cfg, 0, block
    synchronize do
      @pending_events[e] = true
      case @state when :disconnected
        # wait for next connection
        @retrying_events[e] = [nil, Time.at(0)]
      else
        @firing_queue.push e # serialize
        trigger_firing_thread if @firing_queue.size <= 1 # first kick
      end
    end
  end

  # stops the suite.
  def stop
    # べつに何も難しいことがしたいわけではなくて最終的にp0を呼べばいいんだけど、EMがいるかいないか、@connectionがいるかいないかの条件分
    # けで無駄に長いメソッドになっている。

    p0 = lambda do
      EM.cancel_timer @reconnection_timer if ivar? :reconnection_timer
      @retrying_events.each_value do |(idx, *)|
        EM.stop_timer idx if idx
      end
      @retrying_events.clear
      stop_firing_queue

      @state = :uninitialized # この後またEM.run{ .. }されるかも
      @setting_up.clear
      @firing_queue = EM::Queue.new
      @connection = nil
      @channel = nil
      @queue = nil
      @exchange = nil
      @reconnection_timer = nil
      GC.start # 気休め

      logger :info, "OK, stopped.  Good bye."
      if block_given? then
        yield
      else
        EM.stop
      end
    end

    p1 = lambda do
      if ivar? :connection
        @connection.disconnect do
          synchronize do
            p0.call
          end
        end
      else
        p0.call
      end
    end

    p2 = lambda do
      if ivar? :channel
        @channel.close do
          synchronize do
            p1.call
          end
        end
      else
        p1.call
      end
    end

    p3 = lambda do
      if ivar? :queue and @queue.default_consumer
        @queue.unsubscribe :nowait => false do
          synchronize do
            p2.call
          end
        end
      else
        p2.call
      end
    end

    p4 = lambda do
      synchronize do
        logger :info, "finishing up, now sending remaining events."
        @condvar.wait @mutex until @pending_events.empty?
      end
    end

    p5 = lambda do |a|
      synchronize do
        p3.call
      end
    end

    initiate_termination do
      if EM.reactor_running?
        EM.defer p4, p5
      elsif block_given?
        yield
      end
    end
  end

  # Declares that the  application is now terminating  this MQ connection.  No reconnection  / resend attempts are made  any more.  The
  # connection (if any) is  still open and you can push / pull  using it, but by calling this method you  hereby agree that no messages
  # involving this suite are reliable any longer.
  #
  # Yields after the declaration.
  def initiate_termination
    @terminating = true # FIXME: should be mutex-protected
    yield
  end

  def inspect
    sprintf "#<%p:%#x %s cfg=%p ev=%p hook=%p>", self.class, self.object_id, @state, @config, @pending_events, @hooks
  end

  def pretty_print pp
    pp.pp_object self
  end

  # used by pretty printer
  def pretty_print_instance_variables
    %w[@state @config @pending_events @hooks]
  end

  #######

  # @deprecated Do not use it.
  def connection; deprecated :connection end

  # @deprecated Do not use it.
  def channel; deprecated :channel end

  # @deprecated Do not use it.
  def exchange; deprecated :exchange end

  # @deprecated Do not use it.
  def queue; deprecated :queue end

  #######

  # @api private
  def pending_events
    synchronize do
      @pending_events.keys.select {|i| yield i }.map {|i| i.event }
    end
  end

  # @api private
  def pending_events_for sender
    pending_events do |i|
      i.sender == sender
    end
  end

  # @api private
  def self.pending? event
    e = ObjectSpace.each_object self
    e.any? do |obj|
      not obj.pending_events do |i| i.event == event end.empty?
    end
  end

  #######
  private
  #######

  # A thin Tengine.logger wrapper.  As this gem can be used without a logger, we have to work around it.
  # @param [Symbol] lv   One of debug, info, warn, error, fatal, or Logger constants.
  # @param [String] fmt  printf format
  # @param  [Array] argv printf variadic arguments
  def logger lv, fmt, *argv
    msg = sprintf fmt, *argv
    if defined? Tengine.logger
      Tengine.logger.send lv, msg
    else
      STDERR.puts msg
    end
  end

  # Wanted to avoid recursive mutex  deadlocking, so this convenient method.  But beware, recursive locking  situation is in fact a bad
  # habit (if not a bug), and we pay a considerable cost to avoid them here.  This method is far from being lightweight especially when
  # recursive locking happens.
  def synchronize
    begin
      @mutex.lock
    rescue ThreadError => e
      # A deadlock was detected, which means of course, we have the lock.
      bt = e.backtrace.join "\n\tfrom "
      logger :debug, "%s\n\tfrom %s", e, bt
    ensure
      begin
        return yield
      ensure
        begin
          @mutex.unlock
        rescue ThreadError => e
          # @mutex might  magically be unlocked...   For instance, the execution  context might have  been splitted from inside  of the
          # yielded block.  That case, the context who reached here do not own @mutex so should not unlock.
          bt = e.backtrace.join "\n\tfrom "
          logger :debug, "%s\n¥tfrom %s\n", e, bt
        end
      end
    end
  end

  # suppress warning on debug mode
  def ivar? name
    vid = "@#{name}"
    instance_variable_defined?(vid) && instance_variable_get(vid)
  end

  # misc also
  def rehash_them_all
    instance_variables.each do |i|
      obj = instance_variable_get i
      case obj when Hash then
        obj.rehash unless obj.frozen?
      end
    end
  end

  #######

  # Generates a callback according to klass and mid
  # @param [String] klass callback category
  # @param [Symbol] mid   callback ID
  # @return [Proc] a callback.
  def callback_entity klass, mid
    lambda do |*argv|
      exceptions = ExceptionsContainer.new
      begin
        @hooks[:everything].reverse_each do |proc|
          begin
            proc.yield klass, mid, argv
          rescue Exception => e
            exceptions << e
          end
        end

        @hooks[:"#{klass}.#{mid}"].reverse_each do |proc|;
          begin
            proc.yield(*argv)
          rescue Exception => e
            exceptions << e
          end
        end
      ensure
        exceptions.raise
      end
    end
  end

  def deprecated klass
    # このメソッドは警告を表示する。デバッグ用。
    synchronize do
      obj = ivar? klass
      if obj
        logger :debug, "Deprecation warning.  Method %s called from %s", klass, caller[3]
      else
        raise RuntimeError, "found a timing issue. please report to @shyouhei with a reproducible sample code."
      end
      return obj
    end
  end

  # @yields [obj] yields generated object
  def ensures klass
    raise "eventmachine's reactor needed" unless EM.reactor_running?
    # このメソッドはEM.deferでklassの初期化を待つ。EM.deferだから戻り値を使ってはいけない。引数のブロックは、klassが初期化されたことが確
    # 認された後にcallされる。
    p1 = lambda do
      synchronize do
        unless ivar? klass
          setups klass unless @setting_up[klass]
          @condvar.wait @mutex until ivar? klass
        end
      end
    end
    p2 = lambda do |a|
      obj = ivar? klass
      yield obj if block_given?
    end
    EM.defer p1, p2
  end

  def generate_connection cb
    cfg = cb.merge @config[:connection] do |k, v1, v2| v2 end
    AMQP.connect cfg do |conn|
      synchronize do
        @state = :connected
      end
      yield conn
    end
  rescue AMQP::TCPConnectionFailed
    # on_tcp_connection_failrueは指定しているのだけれどそれでもこの例外はあがってくるのだろうか? よくわからない
    # いちおう同じことをさせておく
    cfg[:on_tcp_connection_failrue].yield cfg
  end

  def generate_channel *;
    cfg = @config[:channel]
    ensures :connection do |conn|
      id = AMQP::Channel.next_channel_id
      AMQP::Channel.new conn, id, cfg do |ch|
        yield ch
      end
    end
  end

  def generate_queue *;
    cfg = @config[:queue].dup
    name = cfg.delete :name
    ensures :exchange do |xchg|
      if cfg[:nowait]
        que = @channel.queue name, cfg
        que.bind xchg
        yield que
      else
        @channel.queue name, cfg do |que|
          que.bind xchg, :nowait => false do
            yield que
          end
        end
      end
    end
  end

  def generate_exchange *;
    cfg = @config[:exchange].dup
    name = cfg.delete :name
    type = cfg.delete :type
    cfg.delete :publish # not needed here
    ensures :channel do |ch|
      if cfg[:nowait]
        xchg = AMQP::Exchange.new ch, type.intern, name, cfg
        yield xchg
      else
        AMQP::Exchange.new ch, type.intern, name, cfg do |xchg|
          yield xchg
        end
      end
    end
  end

  def hooks_basic
    %w[
      before_recovery
      after_recovery
      on_connection_interruption
    ]
  end
  alias hooks_queue hooks_basic
  alias hooks_exchange hooks_queue

  def hooks_channel
    hooks_basic + %w[on_error]
  end

  def hooks_connection
    hooks_channel + %w[
      on_closed
      on_possible_authentication_failure
      on_tcp_connection_failure
      on_tcp_connection_loss
    ]
  end

  @@is_under_rspec = \
    begin
      RSpec
    rescue NameError
      false
    end

  def setups klass
    # このメソッドはvidの初期化を実際に行う。mutexは確保されている前提である。
    @setting_up[klass] = true
    mids = send "hooks_#{klass}"
    callbacks = mids.inject Hash.new do |r, x|
      y = x.intern
      cb = callback_entity klass, y
      r.update y => cb
    end

    send "generate_#{klass}", callbacks do |obj|
      callbacks.each_pair do |k, v|
        if @@is_under_rspec
          begin
            obj.send k, &v
          rescue RSpec::Mocks::MockExpectationError
            # objはmock objectかも...
          end
        else
          obj.send k, &v
        end
      end

      unless @@is_under_rspec
        # rspec ではないとき(テスト以外)はundefしておく
        # 本当はテスト時もundefしたいが...
        eigen = class << obj; self; end
        eigen.send :undef_method, *mids
      end

      synchronize do
        instance_variable_set "@#{klass}", obj
        @condvar.broadcast
      end
    end
  end

  #######

  def ensures_handshake
    raise ArgumentError, "no block given" unless block_given?
    raise "eventmachine's reactor needed" unless EM.reactor_running?
    case @state when :established, :unsupported
      EM.next_tick do yield end
    else
      # 2段階のEM.deferを行っている。まず初段で@channelの初期化をキックして、channel -> connectionと依存関係をたぐってコネクションを確立
      # する。channelを成立させるところまでの待ち合わせが第一段。次に、生成した@channelを用いてpublisher confirmationのハンドシェイクを
      # キックして、これが確立するのを待つのが第二段。
      logger :info, "waiting for MQ to be set up (now %s)...", @state

      d4 = lambda do |a|
             yield
           end
      d3 = lambda do
             synchronize do
               unless ensures_handshake_internal
                 setups_handshake unless @setting_up[:handshake]
                 # @condvar.wait @mutex until ensures_handshake_internal
                 @mutex.sleep 0.1 until ensures_handshake_internal
               end
             end
           end
      d2 = lambda do |a|
             EM.defer d3, d4
           end
      d1 = lambda do
             synchronize do
               ensures :channel
               @condvar.wait @mutex until ivar? :channel
             end
           end
      d0 = lambda do
             EM.defer d1, d2
           end
      d0.call
    end
  end

  def ensures_handshake_internal
    case @state when :established, :unsupported
      true
    else
      false
    end
  end

  def setups_handshake
    @setting_up[:handshake] = true
    # possible values of @state:
    #
    # :uninitialized --- not connected yet
    # :disconnected  --- connection lost and still not recovered
    # :connected     --- AMQP session established (no handshake yet)
    # :handshaking   --- handshake in progress, not established yet
    # :established   --- proper handshake was made
    # :unsupported   --- peer rejected handshake, but the connection itself is OK.
    cap = @connection.server_capabilities
    if cap and cap["publisher_confirms"] then
      @state = :handshaking
      @channel.confirm_select do
        # this is in next EM loop...
        synchronize do
          reinvoke_retry_timers unless @retrying_events.empty?
          @channel.on_ack do |ack|
            # this is in another EM loop...
            consume_basic_ack ack 
          end
          @channel.on_nack do |ack|
            # this is in yet another EM loop...
            consume_basic_nack ack 
          end
          @tag = 0
          @state = :established
          @setting_up.delete :handshake
          @condvar.broadcast
        end
      end
    else
      logger :warn, <<-end

The  message  queue  broker   you  are  connecting   lacks  Publisher [BEWARE!]
Confirmation capability,  so we cannot make  sure your events  are in [BEWARE!]
fact reaching  to one  of the Tengine  cores.  We strongly  recommend [BEWARE!]
you to use a relatively recent version of RabbitMQ.                   [BEWARE!]

      end
      @state = :unsupported
      @setting_up.delete :handshake
      @condvar.broadcast
    end
  end

  def consume_basic_ack ack
    synchronize do
      f = @publishing_events.empty?
      n = ack.delivery_tag
      ok = []
      ng = []
      if ack.multiple
        ok, @publishing_events = @publishing_events.partition {|i| i.tag <= n }
      else
        ng, @publishing_events = @publishing_events.partition {|i| i.tag < n }
        if @publishing_events.empty?
          # the packet in quesion is lost?
        elsif ev = @publishing_events.shift
          ok = [ev]
        end
      end
      f ||= !ng.empty?
      ng.each_next_tick do |e|
        synchronize do
          # NGなので再送
          e.retry += 1
          rehash_them_all
          @firing_queue.push e
        end
      end
      ok.each_next_tick do |e|
        # OK, ブロックを評価
        e.block.call if e.block
      end
      unless ok.empty?
        rehash_them_all
        ok.each do |e|
          # ただしく停止させるために上のnext_tickではなくここで
          @retrying_events.delete e
          @pending_events.delete e
        end
        @condvar.broadcast
        f = ok.inject f do |r, e| r | e.opts[:keep_connection] end
      end
      # 帰ってきたackが最後のackで、もう待ちがなくて、かつackに対応するイベントがすべてkeep_connection: falseで送信されていた場合、もう
      # このリアクターは止めていいい。ngが空でなければ@pending_eventsには何か入っている。
      stop if f == false and @pending_events.empty?
    end
  end

  def consume_basic_nack nack
    # nackされたら(再送するから)止まっちゃだめ。なので逆にフローはシンプル。
    synchronize do
      n = nack.delivery_tag
      ng = []
      if ack.multiple
        ng, @publishing_events = @publishing_events.partition {|i| i.tag <= n }
      else
        ng, @publishing_events = @publishing_events.partition {|i| i.tag < n }
        if @publishing_events.empty?
          # the packet in quesion is lost?
        elsif ev = @publishing_events.shift
          ng = [ev]
        end
      end
      ng.each_next_tick do |e|
        synchronize do
          e.retry += 1
          rehash_them_all
          @firing_queue.push e
        end
      end
    end
  end

  #######

  def revoke_retry_timers
    synchronize do
      if @state != :disconnected
        @state = :disconnected
        @retrying_events.each_value do |(idx, *)|
          EM.stop_timer idx if idx
        end
        # all unacknowledged events are hereby considered LOST
        t0 = Time.at 0
        @publishing_events.each do |e|
          @retrying_events[e] = [nil, t0]
        end
      end
    end
  end

  def reinvoke_retry_timers
    synchronize do
      @retrying_events.each_pair.to_a.each_next_tick do |i, (j, k)|
        u = (k + (i.opts[:retry_interval] || 0)) - Time.now
        if u < 0
          # retry interval passed, just send it again
          @firing_queue.push i
        else
          # need to re-add timer (no repeat)
          EM.add_timer u do @firing_queue.push i end
        end
      end
      @retrying_events.clear
      trigger_firing_thread
    end
  end

  def install_default_hooks
    add_hook :everything do |klass, mid, *argv|
      logger :debug, "AMQP event callback: %s.%s", klass, mid#, argv
    end

    add_hook :'connection.before_recovery' do |conn|
      EM.cancel_timer @reconnection_timer if ivar? :reconnection_timer
    end

    add_hook :'connection.on_tcp_connection_loss' do |conn|
      # Wow! AMQP::Session#reconnect is brain-damaged that it cannot be cancelled!
      # conn.reconnect false, auto_reconnect_delay.to_i if auto_reconnect_delay and not @terminating

      ard  = @config[:connection][:auto_reconnect_delay]
      host = @config[:connection][:host]
      port = @config[:connection][:port]
      if ard and not @terminating and conn.closed?
        conn.instance_eval { @reconnecting = true; reset }
        @reconnection_timer = EM.add_timer ard do
          EM.reconnect host, port, conn
          @reconnection_timer = nil
        end
      end
    end

    add_hook :'connection.on_tcp_connection_failure' do |setting|
      @mutex.synchronize do
        case @state when :uninitialized then
          # 最初の接続に失敗した場合。https://www.pivotaltracker.com/story/show/18317933
          raise "It seems the MQ broker is missing (misconfiguration?)"
        else
          logger :error, "It seems the MQ broker is missing."
        end
      end
    end

    add_hook :'channel.on_connection_interruption' do |ch|
      revoke_retry_timers
    end

    add_hook :'channel.after_recovery' do |ch|
      # AMAZING that an AMQP::Channel instance deletes a once-registered callbacks!
      # see: amq/client/async/channel.rb, search for "def reset_state!"
      hooks_channel.inject(Hash.new) {|r, x|
        r.update x.intern => callback_entity(:channel, x.intern)
      }.each_pair {|k, v|
        ch.send(k, &v)
      }

      ch.prefetch @config[:channel][:prefetch] do
        @setting_up.delete :handshake # re-initialize
        reinvoke_retry_timers
        @condvar.broadcast
      end
    end

    add_hook :'connection.after_recovery' do |conn|
      synchronize do
        @state = :connected
      end
    end
  end

  #######

  def stop_firing_queue
    # beautiful...
    @firing_queue.instance_eval do
      @items.clear
      @popq.clear
    end
  end

  def gencb
    @gencb ||= lambda do |ev|
      case @state when :unsupported, :established
        fire_internal ev
        @firing_queue.pop(&gencb)
      else
        # disconnectedとか。
        # 無視?
        @firing_queue.push ev
      end
    end
  end

  def trigger_firing_thread
    # inside mutex
    # event already pushed
    ensures_handshake do
      ensures :exchange do
        synchronize do
          @firing_queue.pop(&gencb)
        end
      end
    end
  end

  def fire_internal ev
    publish ev
  rescue Exception => ex
    # exchange.publish はたとえば RuntimeError を raise したりするようだ
    publish_failed ev, ex
  else
    published ev
  end

  def publish ev
    @exchange.publish ev.event.to_json, @config[:exchange][:publish]
  end

  def publish_failed ev, ex
    if resendable_p ev
      idx = EM.add_timer ev.opts[:retry_interval] do
        synchronize do
          ev.retry += 1
          @firing_queue.push ev
        end
      end
      @retrying_events[ev] = [idx, Time.now]
    else
      # inside mutex lock
      rehash_them_all
      @retrying_events.delete ev
      @pending_events.delete ev
      @publishing_events.reject! {|i| i == ev }
      @condvar.broadcast
      logger :fatal, "SEND FAILED: EVENT LOST %p", ev.event
      stop unless ev.opts[:keep_connection] # 送信失敗かつコネクション維持しないということはここで停止すべき
    end
  end

  def resendable_p ev
    return false if @terminating and not @connection
    return false if @terminating and not @connection.connected?
    return ev.retry < ev.opts[:retry_count]
  end

  def published ev
    case @state
    when :unsupported then
      # ackなし、next_tickをもって送信終了と見なす
      EM.next_tick do
        synchronize do
          rehash_them_all
          @retrying_events.delete ev
          @pending_events.delete ev
          @publishing_events.reject! {|i| i == ev }
          @condvar.broadcast
          ev.block.call if ev.block
          stop unless ev.opts[:keep_connection]
        end
      end
    when :established then
      # ackあり、ackを待つ
      @tag += 1
      ev.tag = @tag
      rehash_them_all
      @publishing_events.push ev
    end
  end
end

# 
# Local Variables:
# mode: ruby
# coding: utf-8-unix
# indent-tabs-mode: nil
# tab-width: 4
# ruby-indent-level: 2
# fill-column: 135
# default-justification: full
# End: