lib/bunny/channel.rb in bunny-0.9.0.pre6 vs lib/bunny/channel.rb in bunny-0.9.0.pre7
- old
+ new
@@ -1,5 +1,6 @@
+# -*- coding: utf-8 -*-
require "thread"
require "set"
require "bunny/consumer_work_pool"
@@ -9,20 +10,149 @@
require "bunny/delivery_info"
require "bunny/return_info"
require "bunny/message_properties"
module Bunny
+ # ## What are AMQP channels
+ #
+ # To quote {http://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}:
+ #
+ # AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex
+ # a heavyweight TCP/IP connection into several light weight connections.
+ # This makes the protocol more “firewall friendly” since port usage is predictable.
+ # It also means that traffic shaping and other network QoS features can be easily employed.
+ # Channels are independent of each other and can perform different functions simultaneously
+ # with other channels, the available bandwidth being shared between the concurrent activities.
+ #
+ #
+ # ## Opening Channels
+ #
+ # Channels can be opened either via `Bunny::Session#create_channel` (sufficient in the majority
+ # of cases) or by instantiating `Bunny::Channel` directly:
+ #
+ # @example Using {Bunny::Session#create_channel}:
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ #
+ # This will automatically allocate channel id.
+ #
+ # @example Instantiating
+ #
+ # ## Closing Channels
+ #
+ # Channels are closed via {Bunny::Channel#close}. Channels that get a channel-level exception are
+ # closed, too. Closed channels can no longer be used. Attempts to use them will raise
+ # {Bunny::ChannelAlreadyClosed}.
+ #
+ # @example
+ #
+ # ch = conn.create_channel
+ # ch.close
+ #
+ # ## Higher-level API
+ #
+ # Bunny offers two sets of methods on {Bunny::Channel}: known as higher-level and lower-level
+ # APIs, respectively. Higher-level API mimics {http://rubyamqp.info amqp gem} API where
+ # exchanges and queues are objects (instance of {Bunny::Exchange} and {Bunny::Queue}, respectively).
+ # Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are
+ # passed as strings (à la RabbitMQ Java client, {http://clojurerabbitmq.info Langohr} and Pika).
+ #
+ # ### Queue Operations In Higher-level API
+ #
+ # * {Bunny::Channel#queue} is used to declare queues. The rest of the API is in {Bunny::Queue}.
+ #
+ #
+ # ### Exchange Operations In Higher-level API
+ #
+ # * {Bunny::Channel#topic} declares a topic exchange. The rest of the API is in {Bunny::Exchange}.
+ # * {Bunny::Channel#direct} declares a direct exchange.
+ # * {Bunny::Channel#fanout} declares a fanout exchange.
+ # * {Bunny::Channel#headers} declares a headers exchange.
+ # * {Bunny::Channel#default_exchange}
+ # * {Bunny::Channel#exchange} is used to declare exchanges with type specified as a symbol or string.
+ #
+ #
+ # ## Channel Qos (Prefetch Level)
+ #
+ # It is possible to control how many messages at most a consumer will be given (before it acknowledges
+ # or rejects previously consumed ones). This setting is per channel and controlled via {Bunny::Channel#prefetch}.
+ #
+ #
+ # ## Channel IDs
+ #
+ # Channels are identified by their ids which are integers. Bunny takes care of allocating and
+ # releasing them as channels are opened and closed. It is almost never necessary to specify
+ # channel ids explicitly.
+ #
+ # There is a limit on the maximum number of channels per connection, usually 65536. Note
+ # that allocating channels is very cheap on both client and server so having tens, hundreds
+ # or even thousands of channels is not a problem.
+ #
+ # ## Channels and Error Handling
+ #
+ # Channel-level exceptions are more common than connection-level ones and often indicate
+ # issues applications can recover from (such as consuming from or trying to delete
+ # a queue that does not exist).
+ #
+ # With Bunny, channel-level exceptions are raised as Ruby exceptions, for example,
+ # {Bunny::NotFound}, that provide access to the underlying `channel.close` method
+ # information.
+ #
+ # @example Handling 404 NOT_FOUND
+ # begin
+ # ch.queue_delete("queue_that_should_not_exist#{rand}")
+ # rescue Bunny::NotFound => e
+ # puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
+ # end
+ #
+ # @example Handling 406 PRECONDITION_FAILED
+ # begin
+ # ch2 = conn.create_channel
+ # q = "bunny.examples.recovery.q#{rand}"
+ #
+ # ch2.queue_declare(q, :durable => false)
+ # ch2.queue_declare(q, :durable => true)
+ # rescue Bunny::PreconditionFailed => e
+ # puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
+ # ensure
+ # conn.create_channel.queue_delete(q)
+ # end
+ #
+ # @see http://www.rabbitmq.com/tutorials/amqp-concepts.html AMQP 0.9.1 Model Concepts Guide
+ # @see http://rubybunny.info/articles/getting_started.html Getting Started with RabbitMQ Using Bunny
+ # @see http://rubybunny.info/articles/error_handling.html Error Handling and Recovery Guide
class Channel
#
# API
#
+ # @return [Integer] Channel id
+ attr_accessor :id
+ # @return [Bunny::Session] AMQP connection this channel was opened on
+ attr_reader :connection
+ attr_reader :status
+ # @return [Bunny::ConsumerWorkPool] Thread pool delivered messages are dispatched to.
+ attr_reader :work_pool
+ # @return [Integer] Next publisher confirmations sequence index
+ attr_reader :next_publish_seq_no
+ # @return [Hash<String, Bunny::Queue>] Queue instances declared on this channel
+ attr_reader :queues
+ # @return [Hash<String, Bunny::Exchange>] Exchange instances declared on this channel
+ attr_reader :exchanges
+ # @return [Set<Integer>] Set of published message indexes that are currently unconfirmed
+ attr_reader :unconfirmed_set
+ # @return [Set<Integer>] Set of nacked message indexes that have been nacked
+ attr_reader :nacked_set
+ # @return [Hash<String, Bunny::Consumer>] Consumer instances declared on this channel
+ attr_reader :consumers
- attr_accessor :id, :connection, :status, :work_pool
- attr_reader :next_publish_seq_no, :queues, :exchanges, :unconfirmed_set, :consumers
-
+ # @param [Bunny::Session] connection AMQP 0.9.1 connection
+ # @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it
+ # @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@connection = connection
@id = id || @connection.next_channel_id
@status = :opening
@@ -37,146 +167,343 @@
@publishing_mutex = Mutex.new
@consumer_mutex = Mutex.new
@unconfirmed_set_mutex = Mutex.new
- @continuations = ::Queue.new
- @confirms_continuations = ::Queue.new
+ @continuations = ::Queue.new
+ @confirms_continuations = ::Queue.new
+ @basic_get_continuations = ::Queue.new
+ # threads awaiting on continuations. Used to unblock
+ # them when network connection goes down so that busy loops
+ # that perform synchronous operations can work. MK.
+ @threads_waiting_on_continuations = Set.new
+ @threads_waiting_on_confirms_continuations = Set.new
+ @threads_waiting_on_basic_get_continuations = Set.new
@next_publish_seq_no = 0
end
+ def read_write_timeout
+ @connection.read_write_timeout
+ end
+
+ # Opens the channel and resets its internal state
+ # @return [Bunny::Channel] Self
+ # @api public
def open
+ @threads_waiting_on_continuations = Set.new
+ @threads_waiting_on_confirms_continuations = Set.new
+ @threads_waiting_on_basic_get_continuations = Set.new
+
@connection.open_channel(self)
# clear last channel error
@last_channel_error = nil
@status = :open
self
end
+ # Closes the channel. Closed channels can no longer be used (this includes associated
+ # {Bunny::Queue}, {Bunny::Exchange} and {Bunny::Consumer} instances.
+ # @api public
def close
@connection.close_channel(self)
closed!
end
+ # @return [Boolean] true if this channel is open, false otherwise
+ # @api public
def open?
@status == :open
end
+ # @return [Boolean] true if this channel is closed (manually or because of an exception), false otherwise
+ # @api public
def closed?
@status == :closed
end
- def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
- q = find_queue(name) || Bunny::Queue.new(self, name, opts)
- register_queue(q)
- end
-
-
#
- # Backwards compatibility with 0.8.0
+ # @group Backwards compatibility with 0.8.0
#
+ # @return [Integer] Channel id
def number
self.id
end
+ # @return [Boolean] true if this channel is open
def active
- @active
+ open?
end
+ # @return [Bunny::Session] Connection this channel was opened on
def client
@connection
end
+ # @private
def frame_size
@connection.frame_max
end
+ # @endgroup
+
#
# Higher-level API, similar to amqp gem
#
+ # @group Higher-level API for exchange operations
+
+ # Declares a fanout exchange or looks it up in the cache of previously
+ # declared exchanges.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange parameters
+ #
+ # @option opts [Boolean] :durable (false) Should the exchange be durable?
+ # @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
+ # @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
+ #
+ # @return [Bunny::Exchange] Exchange instance
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def fanout(name, opts = {})
Exchange.new(self, :fanout, name, opts)
end
+ # Declares a direct exchange or looks it up in the cache of previously
+ # declared exchanges.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange parameters
+ #
+ # @option opts [Boolean] :durable (false) Should the exchange be durable?
+ # @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
+ # @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
+ #
+ # @return [Bunny::Exchange] Exchange instance
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def direct(name, opts = {})
Exchange.new(self, :direct, name, opts)
end
+ # Declares a topic exchange or looks it up in the cache of previously
+ # declared exchanges.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange parameters
+ #
+ # @option opts [Boolean] :durable (false) Should the exchange be durable?
+ # @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
+ # @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
+ #
+ # @return [Bunny::Exchange] Exchange instance
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def topic(name, opts = {})
Exchange.new(self, :topic, name, opts)
end
+ # Declares a headers exchange or looks it up in the cache of previously
+ # declared exchanges.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange parameters
+ #
+ # @option opts [Boolean] :durable (false) Should the exchange be durable?
+ # @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
+ # @option opts [Hash] :arguments ({}) Optional exchange arguments
+ #
+ # @return [Bunny::Exchange] Exchange instance
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def headers(name, opts = {})
Exchange.new(self, :headers, name, opts)
end
+ # Provides access to the default exchange
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @api public
def default_exchange
self.direct(AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end
+ # Declares a headers exchange or looks it up in the cache of previously
+ # declared exchanges.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange parameters
+ #
+ # @option opts [Boolean] :durable (false) Should the exchange be durable?
+ # @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
+ # @option opts [Hash] :arguments ({}) Optional exchange arguments
+ #
+ # @return [Bunny::Exchange] Exchange instance
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
def exchange(name, opts = {})
Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end
+ # @endgroup
+
+
+ # @group Higher-level API for queue operations
+
+ # Declares an exchange or looks it up in the per-channel cache.
+ #
+ # @param [String] name Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).
+ # @param [Hash] opts Queue properties and other options
+ #
+ # @option options [Boolean] :durable (false) Should this queue be durable?
+ # @option options [Boolean] :auto-delete (false) Should this queue be automatically deleted when the last consumer disconnects?
+ # @option options [Boolean] :exclusive (false) Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?
+ # @option options [Boolean] :arguments ({}) Additional optional arguments (typically used by RabbitMQ extensions and plugins)
+ #
+ # @return [Bunny::Queue] Queue that was declared or looked up in the cache
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
+ def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
+ q = find_queue(name) || Bunny::Queue.new(self, name, opts)
+
+ register_queue(q)
+ end
+
+ # @endgroup
+
+
+ # @group QoS and Flow Control
+
+ # Sets how many messages will be given to consumers on this channel before they
+ # have to acknowledge or reject one of the previously consumed messages
+ #
+ # @param [Integer] prefetch_count Prefetch (QoS setting) for this channel
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def prefetch(prefetch_count)
self.basic_qos(prefetch_count, false)
end
+ # Flow control. When set to false, RabbitMQ will stop delivering messages on this
+ # channel.
+ #
+ # @param [Boolean] active Should messages to consumers on this channel be delivered?
+ # @api public
def flow(active)
channel_flow(active)
end
+ # Tells RabbitMQ to redeliver unacknowledged messages
+ # @api public
def recover(ignored = true)
# RabbitMQ only supports basic.recover with requeue = true
basic_recover(true)
end
+ # @endgroup
+
+
+
+ # @group Message acknowledgements
+
+ # Rejects a message. A rejected message can be requeued or
+ # dropped by RabbitMQ.
+ #
+ # @param [Integer] delivery_tag Delivery tag to reject
+ # @param [Boolean] requeue Should this message be requeued instead of dropping it?
+ # @see Bunny::Channel#ack
+ # @see Bunny::Channel#nack
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def reject(delivery_tag, requeue = false)
basic_reject(delivery_tag, requeue)
end
+ # Acknowledges a message. Acknowledged messages are completely removed from the queue.
+ #
+ # @param [Integer] delivery_tag Delivery tag to acknowledge
+ # @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well?
+ # @see Bunny::Channel#nack
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def ack(delivery_tag, multiple = false)
basic_ack(delivery_tag, multiple)
end
alias acknowledge ack
+ # Rejects a message. A rejected message can be requeued or
+ # dropped by RabbitMQ. This method is similar to {Bunny::Channel#reject} but
+ # supports rejecting multiple messages at once, and is usually preferred.
+ #
+ # @param [Integer] delivery_tag Delivery tag to reject
+ # @param [Boolean] requeue Should this message be requeued instead of dropping it?
+ # @param [Boolean] multiple (false) Should all unacknowledged messages up to this be rejected as well?
+ # @see Bunny::Channel#ack
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def nack(delivery_tag, requeue, multiple = false)
basic_nack(delivery_tag, requeue, multiple)
end
- def on_error(&block)
- @default_error_handler = block
- end
+ # @endgroup
- def using_publisher_confirmations?
- @next_publish_seq_no > 0
- end
-
#
# Lower-level API, exposes protocol operations as they are defined in the protocol,
# without any OO sugar on top, by design.
#
- # basic.*
+ # @group Consumer and Message operations (basic.*)
+ # Publishes a message using basic.publish AMQP 0.9.1 method.
+ #
+ # @param [String] payload Message payload. It will never be modified by Bunny or RabbitMQ in any way.
+ # @param [String] exchange Exchange to publish to
+ # @param [String] routing_key Routing key
+ # @param [Hash] opts Publishing options
+ #
+ # @option opts [Boolean] :persistent Should the message be persisted to disk?
+ # @option opts [Boolean] :mandatory Should the message be returned if it cannot be routed to any queue?
+ # @option opts [Integer] :timestamp A timestamp associated with this message
+ # @option opts [Integer] :expiration Expiration time after which the message will be deleted
+ # @option opts [String] :type Message type, e.g. what type of event or command this message represents. Can be any string
+ # @option opts [String] :reply_to Queue name other apps should send the response to
+ # @option opts [String] :content_type Message content type (e.g. application/json)
+ # @option opts [String] :content_encoding Message content encoding (e.g. gzip)
+ # @option opts [String] :correlation_id Message correlated to this one, e.g. what request this message is a reply for
+ # @option opts [Integer] :priority Message priority, 0 to 9. Not used by RabbitMQ, only applications
+ # @option opts [String] :message_id Any message identifier
+ # @option opts [String] :user_id Optional user ID. Verified by RabbitMQ against the actual connection username
+ # @option opts [String] :app_id Optional application ID
+ #
+ # @return [Bunny::Channel] Self
+ # @api public
def basic_publish(payload, exchange, routing_key, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
- meta = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.
+ mode = if opts.fetch(:persistent, true)
+ 2
+ else
+ 1
+ end
+ meta = { :priority => 0, :delivery_mode => mode, :content_type => "application/octet-stream" }.
merge(opts)
if @next_publish_seq_no > 0
@unconfirmed_set.add(@next_publish_seq_no)
@next_publish_seq_no += 1
@@ -192,72 +519,261 @@
@connection.frame_max), self)
self
end
+ # Synchronously fetches a message from the queue, if there are any. This method is
+ # for cases when the convenience of synchronous operations is more important than
+ # throughput.
+ #
+ # @param [String] queue Queue name
+ # @param [Hash] opts Options
+ #
+ # @option opts [Boolean] :ack (true) Will this message be acknowledged manually?
+ #
+ # @return [Array] A triple of delivery info, message properties and message content
+ #
+ # @example Using Bunny::Channel#basic_get with manual acknowledgements
+ # conn = Bunny.new
+ # conn.start
+ # ch = conn.create_channel
+ # # here we assume the queue already exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :ack => true)
+ # ch.acknowledge(delivery_info.delivery_tag)
+ # @see Bunny::Queue#pop
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !opts[:ack]))
- @last_basic_get_response = @continuations.pop
+ @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:ack])))
+ # this is a workaround for the edge case when basic_get is called in a tight loop
+ # and network goes down we need to perform recovery. The problem is, basic_get will
+ # keep blocking the thread that calls it without clear way to constantly unblock it
+ # from the network activity loop (where recovery happens) with the current continuations
+ # implementation (and even more correct and convenient ones, such as wait/notify, should
+ # we implement them). So we return a triple of nils immediately which apps should be
+ # able to handle anyway as "got no message, no need to act". MK.
+ @last_basic_get_response = if @connection.open?
+ wait_on_basic_get_continuations
+ else
+ [nil, nil, nil]
+ end
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_get_response
end
+ # Controls message delivery rate using basic.qos AMQP 0.9.1 method.
+ #
+ # @param [Integer] prefetch_count How many messages can consumers on this channel be given at a time
+ # (before they have to acknowledge or reject one of the earlier received messages)
+ # @param [Boolean] global (false) Ignored, as it is not supported by RabbitMQ
+ # @return [AMQ::Protocol::Basic::QosOk] RabbitMQ response
+ # @see Bunny::Channel#prefetch
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_qos(prefetch_count, global = false)
raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_basic_qos_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@prefetch_count = prefetch_count
@last_basic_qos_ok
end
+ # Redeliver unacknowledged messages
+ #
+ # @param [Boolean] requeue Should messages be requeued?
+ # @return [AMQ::Protocol::Basic::RecoverOk] RabbitMQ response
+ # @api public
def basic_recover(requeue)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_basic_recover_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_basic_recover_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_recover_ok
end
+ # Rejects or requeues a message.
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] requeue Should the message be requeued?
+ # @return [NilClass] nil
+ #
+ # @example Requeue a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_reject(delivery_info.delivery_tag, true)
+ # end
+ #
+ # @example Reject a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_reject(delivery_info.delivery_tag, false)
+ # end
+ #
+ # @example Requeue a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_reject(delivery_info.delivery_tag, true)
+ #
+ # @see Bunny::Channel#basic_nack
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_reject(delivery_tag, requeue)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
nil
end
+ # Acknowledges a delivery (message).
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] multiple Should all deliveries up to this one be acknowledged?
+ # @return [NilClass] nil
+ #
+ # @example Ack a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_ack(delivery_info.delivery_tag)
+ # end
+ #
+ # @example Ack a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_ack(delivery_info.delivery_tag)
+ #
+ # @example Ack multiple messages fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # # ack all fetched messages up to payload3
+ # ch.basic_ack(delivery_info.delivery_tag, true)
+ #
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
nil
end
+ # Rejects or requeues messages just like {Bunny::Channel#basic_reject} but can do so
+ # with multiple messages at once.
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] requeue Should the message be requeued?
+ # @param [Boolean] multiple Should all deliveries up to this one be rejected/requeued?
+ # @return [NilClass] nil
+ #
+ # @example Requeue a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_nack(delivery_info.delivery_tag, true)
+ # end
+ #
+ # @example Reject a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_nack(delivery_info.delivery_tag, false)
+ # end
+ #
+ # @example Requeue a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_nack(delivery_info.delivery_tag, true)
+ #
+ #
+ # @example Requeue multiple messages fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # # requeue all fetched messages up to payload3
+ # ch.basic_nack(delivery_info.delivery_tag, true, true)
+ #
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def basic_nack(delivery_tag, requeue, multiple = false)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
delivery_tag,
requeue,
multiple))
nil
end
+ # Registers a consumer for queue. Delivered messages will be handled with the block
+ # provided to this method.
+ #
+ # @param [String, Bunny::Queue] queue Queue to consume from
+ # @param [String] consumer_tag Consumer tag (unique identifier), generated by Bunny by default
+ # @param [Boolean] no_ack (false) If false, delivered messages will be automatically acknowledged.
+ # If true, manual acknowledgements will be necessary.
+ # @param [Boolean] exclusive (false) Should this consumer be exclusive?
+ # @param [Hash] arguments (nil) Optional arguments that may be used by RabbitMQ extensions, etc
+ #
+ # @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
queue_name = if queue.respond_to?(:name)
@@ -278,19 +794,27 @@
# in the queue already. MK.
if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
end
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_basic_consume_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)
@last_basic_consume_ok
end
+ # Registers a consumer for queue as {Bunny::Consumer} instance.
+ #
+ # @param [Bunny::Consumer] consumer Consumer to register. It should already have queue name, consumer tag
+ # and other attributes set.
+ #
+ # @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_consume_with(consumer)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
@connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
@@ -306,34 +830,61 @@
# in the queue already. MK.
if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
register_consumer(consumer.consumer_tag, consumer)
end
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_basic_consume_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
register_consumer(@last_basic_consume_ok.consumer_tag, consumer)
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_consume_ok
end
+ # Removes a consumer. Messages for this consumer will no longer be delivered. If the queue
+ # it was on is auto-deleted and this consumer was the last one, the queue will be deleted.
+ #
+ # @param [String] consumer_tag Consumer tag (unique identifier) to cancel
+ #
+ # @return [AMQ::Protocol::Basic::CancelOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_basic_cancel_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_basic_cancel_ok = wait_on_continuations
end
@last_basic_cancel_ok
end
+ # @endgroup
- # queue.*
+ # @group Queue operations (queue.*)
+
+ # Declares a queue using queue.declare AMQP 0.9.1 method.
+ #
+ # @param [String] name Queue name
+ # @param [Hash] opts Queue properties
+ #
+ # @option opts [Boolean] durable (false) Should information about this queue be persisted to disk so that it
+ # can survive broker restarts? Typically set to true for long-lived queues.
+ # @option opts [Boolean] auto_delete (false) Should this queue be deleted when the last consumer is cancelled?
+ # @option opts [Boolean] exclusive (false) Should only this connection be able to use this queue?
+ # If true, the queue will be automatically deleted when this
+ # connection is closed
+ # @option opts [Boolean] passive (false) If true, queue will be checked for existence. If it does not
+ # exist, {Bunny::NotFound} will be raised.
+ #
+ # @return [AMQ::Protocol::Queue::DeclareOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_declare(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id,
name,
@@ -341,46 +892,77 @@
opts.fetch(:durable, false),
opts.fetch(:exclusive, false),
opts.fetch(:auto_delete, false),
false,
opts[:arguments]))
- @last_queue_declare_ok = @continuations.pop
+ @last_queue_declare_ok = wait_on_continuations
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_declare_ok
end
+ # Deletes a queue using queue.delete AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [Hash] opts Options
+ #
+ # @option opts [Boolean] if_unused (false) Should this queue be deleted only if it has no consumers?
+ # @option opts [Boolean] if_empty (false) Should this queue be deleted only if it has no messages?
+ #
+ # @return [AMQ::Protocol::Queue::DeleteOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_delete(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id,
name,
opts[:if_unused],
opts[:if_empty],
false))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_queue_delete_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_queue_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_delete_ok
end
+ # Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
+ #
+ # @param [String] name Queue name
+ #
+ # @return [AMQ::Protocol::Queue::PurgeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_purge(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_queue_purge_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_queue_purge_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_purge_ok
end
+ # Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [String] exchange Exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Queue::BindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @api public
def queue_bind(name, exchange, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
@@ -392,18 +974,31 @@
name,
exchange_name,
opts[:routing_key],
false,
opts[:arguments]))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_queue_bind_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_queue_bind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_bind_ok
end
+ # Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [String] exchange Exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Queue::UnbindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @api public
def queue_unbind(name, exchange, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
@@ -414,21 +1009,37 @@
@connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id,
name,
exchange_name,
opts[:routing_key],
opts[:arguments]))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_queue_unbind_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_queue_unbind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_unbind_ok
end
+ # @endgroup
- # exchange.*
+ # @group Exchange operations (exchange.*)
+
+ # Declares a echange using echange.declare AMQP 0.9.1 method.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange properties
+ #
+ # @option opts [Boolean] durable (false) Should information about this echange be persisted to disk so that it
+ # can survive broker restarts? Typically set to true for long-lived exchanges.
+ # @option opts [Boolean] auto_delete (false) Should this echange be deleted when it is no longer used?
+ # @option opts [Boolean] passive (false) If true, exchange will be checked for existence. If it does not
+ # exist, {Bunny::NotFound} will be raised.
+ #
+ # @return [AMQ::Protocol::Exchange::DeclareOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/echanges.html Exchanges and Publishing guide
+ # @api public
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id,
name,
@@ -437,33 +1048,58 @@
opts.fetch(:durable, false),
opts.fetch(:auto_delete, false),
false,
false,
opts[:arguments]))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_exchange_declare_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_exchange_declare_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_declare_ok
end
+ # Deletes a exchange using exchange.delete AMQP 0.9.1 method
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [Boolean] if_unused (false) Should this exchange be deleted only if it is no longer used
+ #
+ # @return [AMQ::Protocol::Exchange::DeleteOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @api public
def exchange_delete(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id,
name,
opts[:if_unused],
false))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_exchange_delete_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_exchange_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_delete_ok
end
+ # Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension
+ # that RabbitMQ provides.
+ #
+ # @param [String] source Source exchange name
+ # @param [String] destination Destination exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Exchange::BindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def exchange_bind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
source.name
@@ -481,18 +1117,33 @@
destination_name,
source_name,
opts[:routing_key],
false,
opts[:arguments]))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_exchange_bind_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_exchange_bind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_bind_ok
end
+ # Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension
+ # that RabbitMQ provides.
+ #
+ # @param [String] source Source exchange name
+ # @param [String] destination Destination exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Exchange::UnbindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def exchange_unbind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
source.name
@@ -510,158 +1161,274 @@
destination_name,
source_name,
opts[:routing_key],
false,
opts[:arguments]))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_exchange_unbind_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_exchange_unbind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_unbind_ok
end
- # channel.*
+ # @endgroup
+
+
+ # @group Flow control (channel.*)
+
+ # Enables or disables message flow for the channel. When message flow is disabled,
+ # no new messages will be delivered to consumers on this channel. This is typically
+ # used by consumers that cannot keep up with the influx of messages.
+ #
+ # @note Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects
+ # that consumers do not keep up with them.
+ #
+ # @return [AMQ::Protocol::Channel::FlowOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def channel_flow(active)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_channel_flow_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_channel_flow_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_channel_flow_ok
end
- # tx.*
+ # @endgroup
+
+
+ # @group Transactions (tx.*)
+
+ # Puts the channel into transaction mode (starts a transaction)
+ # @return [AMQ::Protocol::Tx::SelectOk] RabbitMQ response
+ # @api public
def tx_select
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_tx_select_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_select_ok
end
+ # Commits current transaction
+ # @return [AMQ::Protocol::Tx::CommitOk] RabbitMQ response
+ # @api public
def tx_commit
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_tx_commit_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_tx_commit_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_commit_ok
end
+ # Rolls back current transaction
+ # @return [AMQ::Protocol::Tx::RollbackOk] RabbitMQ response
+ # @api public
def tx_rollback
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_tx_rollback_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_tx_rollback_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_rollback_ok
end
- # confirm.*
+ # @endgroup
+
+
+ # @group Publisher Confirms (confirm.*)
+
+ # @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise
+ # @api public
+ def using_publisher_confirmations?
+ @next_publish_seq_no > 0
+ end
+
+ # Enables publisher confirms for the channel.
+ # @return [AMQ::Protocol::Confirm::SelectOk] RabbitMQ response
+ # @see #wait_for_confirms
+ # @see #unconfirmed_set
+ # @see #nacked_set
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def confirm_select(callback=nil)
raise_if_no_longer_open!
if @next_publish_seq_no == 0
@confirms_continuations = ::Queue.new
@unconfirmed_set = Set.new
+ @nacked_set = Set.new
@next_publish_seq_no = 1
end
@confirms_callback = callback
@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
- Bunny::Timer.timeout(1, ClientTimeout) do
- @last_confirm_select_ok = @continuations.pop
+ Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
+ @last_confirm_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_confirm_select_ok
end
+ # Blocks calling thread until confirms are received for all
+ # currently unacknowledged published messages.
+ #
+ # @return [Boolean] true if all messages were acknowledged positively, false otherwise
+ # @see #confirm_select
+ # @see #unconfirmed_set
+ # @see #nacked_set
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def wait_for_confirms
- @only_acks_received = true
- @confirms_continuations.pop
+ wait_on_confirms_continuations
@only_acks_received
end
+ # @endgroup
+
+ # @group Misc
+
+ # Synchronizes given block using this channel's mutex.
+ # @api public
+ def synchronize(&block)
+ @publishing_mutex.synchronize(&block)
+ end
+
+ # Unique string supposed to be used as a consumer tag.
#
+ # @return [String] Unique string.
+ # @api plugin
+ def generate_consumer_tag(name = "bunny")
+ "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
+ end
+
+ # @endgroup
+
+
+ #
+ # Error Handilng
+ #
+
+ # Defines a handler for errors that are not responses to a particular
+ # operations (e.g. basic.ack, basic.reject, basic.nack).
+ #
+ # @api public
+ def on_error(&block)
+ @on_error = block
+ end
+
+
+ #
# Recovery
#
+ # @group Network Failure Recovery
+
+ # Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
def recover_from_network_failure
- # puts "Recovering channel #{@id} from network failure..."
+ # puts "Recovering channel #{@id}"
+ release_all_continuations
+
recover_prefetch_setting
recover_exchanges
# this includes recovering bindings
recover_queues
recover_consumers
end
+ # Recovers basic.qos setting. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
def recover_prefetch_setting
basic_qos(@prefetch_count) if @prefetch_count
end
+ # Recovers exchanges. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
def recover_exchanges
@exchanges.values.dup.each do |x|
x.recover_from_network_failure
end
end
+ # Recovers queues and bindings. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
def recover_queues
@queues.values.dup.each do |q|
+ # puts "Recovering queue #{q.name}"
q.recover_from_network_failure
end
end
+ # Recovers consumers. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
def recover_consumers
unless @consumers.empty?
@work_pool = ConsumerWorkPool.new(@work_pool.size)
@work_pool.start
end
@consumers.values.dup.each do |c|
c.recover_from_network_failure
end
end
+ # @endgroup
#
# Implementation
#
+ # @private
def register_consumer(consumer_tag, consumer)
@consumer_mutex.synchronize do
@consumers[consumer_tag] = consumer
end
end
+ # @private
def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block)
@consumer_mutex.synchronize do
c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
c.on_delivery(&block) if block
@consumers[consumer_tag] = c
end
end
+ # @private
def handle_method(method)
# puts "Channel#handle_frame on channel #{@id}: #{method.inspect}"
case method
when AMQ::Protocol::Queue::DeclareOk then
@continuations.push(method)
@@ -707,31 +1474,44 @@
when AMQ::Protocol::Basic::Ack then
handle_ack_or_nack(method.delivery_tag, method.multiple, false)
when AMQ::Protocol::Basic::Nack then
handle_ack_or_nack(method.delivery_tag, method.multiple, true)
when AMQ::Protocol::Channel::Close then
- # puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}"
closed!
@connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id))
- @last_channel_error = instantiate_channel_level_exception(method)
- @continuations.push(method)
+ # basic.ack, basic.reject, basic.nack. MK.
+ if channel_level_exception_after_operation_that_has_no_response?(method)
+ @on_error.call(self, method) if @on_error
+ else
+ @last_channel_error = instantiate_channel_level_exception(method)
+ @continuations.push(method)
+ end
+
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
else
raise "Do not know how to handle #{method.inspect} in Bunny::Channel#handle_method"
end
end
+ # @private
+ def channel_level_exception_after_operation_that_has_no_response?(method)
+ method.reply_code == 406 && method.reply_text =~ /unknown delivery tag/
+ end
+
+ # @private
def handle_basic_get_ok(basic_get_ok, properties, content)
- @continuations.push([basic_get_ok, properties, content])
+ @basic_get_continuations.push([basic_get_ok, properties, content])
end
+ # @private
def handle_basic_get_empty(basic_get_empty)
- @continuations.push([nil, nil, nil])
+ @basic_get_continuations.push([nil, nil, nil])
end
+ # @private
def handle_frameset(basic_deliver, properties, content)
consumer = @consumers[basic_deliver.consumer_tag]
if consumer
@work_pool.submit do
consumer.call(DeliveryInfo.new(basic_deliver), MessageProperties.new(properties), content)
@@ -740,21 +1520,33 @@
# TODO: log it
puts "[warning] No consumer for tag #{basic_deliver.consumer_tag}"
end
end
+ # @private
def handle_basic_return(basic_return, properties, content)
x = find_exchange(basic_return.exchange)
if x
x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content)
else
# TODO: log a warning
end
end
+ # @private
def handle_ack_or_nack(delivery_tag, multiple, nack)
+ if nack
+ cloned_set = @unconfirmed_set.clone
+ if multiple
+ cloned_set.keep_if { |i| i <= delivery_tag }
+ @nacked_set.merge(cloned_set)
+ else
+ @nacked_set.add(delivery_tag)
+ end
+ end
+
if multiple
@unconfirmed_set.delete_if { |i| i <= delivery_tag }
else
@unconfirmed_set.delete(delivery_tag)
end
@@ -766,79 +1558,153 @@
@confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
end
end
+ # @private
+ def wait_on_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_continuations << t
+
+ v = @continuations.pop
+ @threads_waiting_on_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @continuations.length > 0
+
+ @continuations.pop
+ end
+ end
+
+ # @private
+ def wait_on_basic_get_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_basic_get_continuations << t
+
+ v = @basic_get_continuations.pop
+ @threads_waiting_on_basic_get_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @basic_get_continuations.length > 0
+
+ @basic_get_continuations.pop
+ end
+ end
+
+ # @private
+ def wait_on_confirms_continuations
+ if @connection.threaded
+ t = Thread.current
+ @threads_waiting_on_confirms_continuations << t
+
+ v = @confirms_continuations.pop
+ @threads_waiting_on_confirms_continuations.delete(t)
+
+ v
+ else
+ connection.event_loop.run_once until @confirms_continuations.length > 0
+
+ @confirms_continuations.pop
+ end
+ end
+
+ # Releases all continuations. Used by automatic network recovery.
+ # @private
+ def release_all_continuations
+ if @confirms_continuations.num_waiting > 0
+ @threads_waiting_on_confirms_continuations.each do |t|
+ t.run
+ end
+ end
+ if @continuations.num_waiting > 0
+ @threads_waiting_on_continuations.each do |t|
+ t.run
+ end
+ end
+ if @basic_get_continuations.num_waiting > 0
+ @threads_waiting_on_basic_get_continuations.each do |t|
+ t.run
+ end
+ end
+
+ @continuations = ::Queue.new
+ @confirms_continuations = ::Queue.new
+ @basic_get_continuations = ::Queue.new
+ end
+
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
# that won't do any real work for channels that do not register consumers (e.g. only used for
# publishing). MK.
+ # @private
def maybe_start_consumer_work_pool!
@work_pool.start unless @work_pool.started?
end
+ # @private
def maybe_pause_consumer_work_pool!
@work_pool.pause if @work_pool && @work_pool.started?
end
+ # @private
def maybe_kill_consumer_work_pool!
@work_pool.kill if @work_pool && @work_pool.started?
end
+ # @private
def read_next_frame(options = {})
@connection.read_next_frame(options = {})
end
- # Synchronizes given block using this channel's mutex.
- # @api public
- def synchronize(&block)
- @publishing_mutex.synchronize(&block)
- end
-
+ # @private
def deregister_queue(queue)
@queues.delete(queue.name)
end
+ # @private
def deregister_queue_named(name)
@queues.delete(name)
end
+ # @private
def register_queue(queue)
@queues[queue.name] = queue
end
+ # @private
def find_queue(name)
@queues[name]
end
+ # @private
def deregister_exchange(exchange)
@exchanges.delete(exchange.name)
end
+ # @private
def register_exchange(exchange)
@exchanges[exchange.name] = exchange
end
+ # @private
def find_exchange(name)
@exchanges[name]
end
- # Unique string supposed to be used as a consumer tag.
- #
- # @return [String] Unique string.
- # @api plugin
- def generate_consumer_tag(name = "bunny")
- "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
- end
-
protected
+ # @private
def closed!
@status = :closed
@work_pool.shutdown
@connection.release_channel_id(@id)
end
+ # @private
def instantiate_channel_level_exception(frame)
case frame
when AMQ::Protocol::Channel::Close then
klass = case frame.reply_code
when 403 then
@@ -855,13 +1721,15 @@
klass.new(frame.reply_text, self, frame)
end
end
+ # @private
def raise_if_continuation_resulted_in_a_channel_error!
raise @last_channel_error if @last_channel_error
end
+ # @private
def raise_if_no_longer_open!
raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed?
end
end
end