# encoding: utf-8

module AMQ
  module Client
    module Extensions
      module RabbitMQ
        # h2. Purpose
        # In case that the broker crashes, some messages can get lost.
        # Thanks to this extension, broker sends Basic.Ack when the message
        # is processed by the broker. In case of persistent messages, it must
        # be written to disk or ack'd on all the queues it was delivered to.
        # However it doesn't have to be necessarily 1:1, because the broker
        # can send Basic.Ack with multi flag to acknowledge multiple messages.
        #
        # So it provides clients a lightweight way of keeping track of which
        # messages have been processed by the broker and which would need
        # re-publishing in case of broker shutdown or network failure.
        #
        # Transactions are solving the same problem, but they are very slow:
        # confirmations are more than 100 times faster.
        #
        # h2. Workflow
        # * Client asks broker to confirm messages on given channel (Confirm.Select).
        # * Broker sends back Confirm.Select-Ok, unless we sent Confirm.Select with nowait=true.
        # * After each published message, the client receives Basic.Ack from the broker.
        # * If something bad happens inside the broker, it sends Basic.Nack.
        #
        # h2. Gotchas
        # Note that we don't keep track of messages awaiting confirmation.
        # It'd add a huge overhead and it's impossible to come up with one-suits-all solution.
        # If you want to create such module, you'll probably want to redefine Channel#after_publish,
        # so it will put messages into a queue and then handlers for Basic.Ack and Basic.Nack.
        # This is the reason why we pass every argument from Exchange#publish to Channel#after_publish.
        # You should not forget though, that both of these methods can have multi flag!
        #
        # Transactional channel cannot be put into confirm mode and a confirm
        # mode channel cannot be made transactional.
        #
        # If the connection between the publisher and broker drops with outstanding
        # confirms, it does not necessarily mean that the messages were lost, so
        # republishing may result in duplicate messages.

        # h2. Learn more
        # @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
        # @see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#class.confirm
        # @see http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.ack
        module Confirm
          module ChannelMixin

            # Change publisher index. Publisher index is incremented
            # by 1 after each Basic.Publish starting at 1. This is done
            # on both client and server, hence this acknowledged messages
            # can be matched via its delivery-tag.
            #
            # @api private
            attr_writer :publisher_index

            # Publisher index is an index of the last message since
            # the confirmations were activated, started with 1. It's
            # incremented by 1 after each Basic.Publish starting at 1.
            # This is done on both client and server, hence this
            # acknowledged messages can be matched via its delivery-tag.
            #
            # @return [Integer] Current publisher index.
            # @api public
            def publisher_index
              @publisher_index ||= 1
            end

            # Resets publisher index to 0
            #
            # @api plugin
            def reset_publisher_index!
              @publisher_index = 0
            end


            # This method is executed after publishing of each message via {Exchage#publish}.
            # Currently it just increments publisher index by 1, so messages
            # can be actually matched.
            #
            # @api plugin
            def after_publish(*args)
              self.publisher_index += 1
            end

            # Turn on confirmations for this channel and, if given,
            # register callback for Confirm.Select-Ok.
            #
            # @raise [RuntimeError] Occurs when confirmations are already activated.
            # @raise [RuntimeError] Occurs when nowait is true and block is given.
            #
            # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
            # @yield [method] Callback which will be executed once we receive Confirm.Select-Ok.
            # @yieldparam [AMQ::Protocol::Confirm::SelectOk] method Protocol method class instance.
            #
            # @return [self] self.
            #
            # @see #confirm
            def confirm_select(nowait = false, &block)
              if nowait && block
                raise "You can't use Confirm.Select with nowait=true and a callback at the same time."
              end

              @uses_publisher_confirmations = true
              self.redefine_callback(:confirm_select, &block)
              @client.send(Protocol::Confirm::Select.encode(@id, nowait))

              self
            end

            # @return [Boolean]
            def uses_publisher_confirmations?
              @uses_publisher_confirmations
            end # uses_publisher_confirmations?


            # Turn on confirmations for this channel and, if given,
            # register callback for basic.ack from the broker.
            #
            # @raise [RuntimeError] Occurs when confirmations are already activated.
            # @raise [RuntimeError] Occurs when nowait is true and block is given.
            # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
            #
            # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker.
            # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.
            #
            # @return [self] self.
            def on_ack(nowait = false, &block)
              self.use_publisher_confirmations! unless self.uses_publisher_confirmations?

              self.define_callback(:ack, &block) if block

              self
            end


            # Register error callback for Basic.Nack. It's called
            # when message(s) is rejected.
            #
            # @return [self] self
            def on_nack(&block)
              self.define_callback(:nack, &block) if block

              self
            end




            # Handler for Confirm.Select-Ok. By default, it just
            # executes hook specified via the #confirmations method
            # with a single argument, a protocol method class
            # instance (an instance of AMQ::Protocol::Confirm::SelectOk)
            # and then it deletes the callback, since Confirm.Select
            # is supposed to be sent just once.
            #
            # @api plugin
            def handle_select_ok(method)
              self.exec_callback_once(:confirm_select, method)
            end

            # Handler for Basic.Ack. By default, it just
            # executes hook specified via the #confirm method
            # with a single argument, a protocol method class
            # instance (an instance of AMQ::Protocol::Basic::Ack).
            #
            # @api plugin
            def handle_basic_ack(method)
              self.exec_callback(:ack, method)
            end


            # Handler for Basic.Nack. By default, it just
            # executes hook specified via the #confirm_failed method
            # with a single argument, a protocol method class
            # instance (an instance of AMQ::Protocol::Basic::Nack).
            #
            # @api plugin
            def handle_basic_nack(method)
              self.exec_callback(:nack, method)
            end


            def reset_state!
              super

              @uses_publisher_confirmations = false
            end


            def self.included(host)
              host.handle(Protocol::Confirm::SelectOk) do |client, frame|
                method  = frame.decode_payload
                channel = client.connection.channels[frame.channel]
                channel.handle_select_ok(method)
              end

              host.handle(Protocol::Basic::Ack) do |client, frame|
                method  = frame.decode_payload
                channel = client.connection.channels[frame.channel]
                channel.handle_basic_ack(method)
              end

              host.handle(Protocol::Basic::Nack) do |client, frame|
                method  = frame.decode_payload
                channel = client.connection.channels[frame.channel]
                channel.handle_basic_nack(method)
              end
            end # self.included(host)
          end # ChannelMixin


          module ExchangeMixin
            # Publish message and then run #after_publish on channel belonging
            # to the exchange. This is used for incrementing the publisher index.
            #
            # @api public
            # @see AMQ::Client::Exchange#publish
            # @see AMQ::Client::Extensions::RabbitMQ::Channel#publisher_index
            # @return [self] self
            def publish(*args)
              super(*args)
              @channel.after_publish(*args)

              self
            end # publish
          end # ExchangeMixin
        end # Confirm
      end # RabbitMQ
    end # Extensions


    class Channel
      # use modules, a native Ruby way of extension of existing classes,
      # instead of reckless monkey-patching. MK.
      include Extensions::RabbitMQ::Confirm::ChannelMixin
    end # Channel

    class Exchange
      # use modules, a native Ruby way of extension of existing classes,
      # instead of reckless monkey-patching. MK.
      include Extensions::RabbitMQ::Confirm::ExchangeMixin
    end # Exchange
  end # Client
end # AMQ