lib/amqp/queue.rb in amqp-1.1.0.pre1 vs lib/amqp/queue.rb in amqp-1.1.0.pre2

- old
+ new

@@ -1,8 +1,10 @@ # encoding: utf-8 -require "amq/client/queue" +require "amqp/entity" + +require "amq/protocol/get_response" require "amqp/consumer" module AMQP # h2. What are AMQP queues? # @@ -116,23 +118,49 @@ # persistence. # # # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1) # @see AMQP::Exchange - class Queue < AMQ::Client::Queue + class Queue # + # Behaviours + # + + include Entity + include ServerNamedEntity + extend ProtocolMethodHandlers + + + + # # API # # Name of this queue attr_reader :name # Options this queue object was instantiated with attr_accessor :opts + # Channel this queue belongs to. + # @return [AMQP::Channel] + attr_reader :channel + # @return [Array<Hash>] All consumers on this queue. + attr_reader :consumers + # @return [AMQP::Consumer] Default consumer (registered with {Queue#consume}). + attr_reader :default_consumer + + # @return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions. + attr_reader :arguments + + # @return [Array<Hash>] + attr_reader :bindings + + + # @option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not # already exist. The client can use this to check whether the queue # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new queue, the queue will be marked as @@ -165,11 +193,11 @@ # the queue. # # # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. - # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. + # @yieldparam [AMQ::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @api public def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @@ -181,14 +209,31 @@ raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait] # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. - @declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + @declaration_deferrable = AMQP::Deferrable.new - super(channel.connection, channel, name) + super(channel.connection) + @name = name + # this has to stay true even after queue.declare-ok arrives. MK. + @server_named = @name.empty? + if @server_named + self.on_connection_interruption do + # server-named queue need to get new names after recovery. MK. + @name = AMQ::Protocol::EMPTY_STRING + end + end + + @channel = channel + + # primarily for autorecovery. MK. + @bindings = Array.new + + @consumers = Hash.new + shim = Proc.new do |q, declare_ok| case block.arity when 1 then block.call(q) else block.call(q, declare_ok) @@ -200,15 +245,15 @@ @declaration_deferrable.succeed block.call(self) if block end if block - self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) + self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) else - # we cannot pass :nowait as true here, AMQ::Client::Queue will (rightfully) raise an exception because + # we cannot pass :nowait as true here, AMQP::Queue will (rightfully) raise an exception because # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK. - self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]) + self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]) end end end # Defines a callback that will be executed once queue is declared. More than one callback can be defined. @@ -222,10 +267,29 @@ block.call if @channel.open? end end # once_declared(&block) + # @return [Boolean] true if this queue was declared as durable (will survive broker restart). + # @api public + def durable? + @durable + end # durable? + + # @return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) + # @api public + def exclusive? + @exclusive + end # exclusive? + + # @return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). + # @api public + def auto_delete? + @auto_delete + end # auto_delete? + + # @return [Boolean] true if this queue is server-named def server_named? @server_named end # server_named? @@ -280,11 +344,11 @@ # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) + queue_bind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end end self end @@ -349,11 +413,11 @@ # @api public # @see Queue#bind def unbind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) + queue_unbind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) end end end @@ -377,19 +441,19 @@ # # # @return [NilClass] nil (for v0.7 compatibility) # # @yield [delete_ok] Yields AMQP method (queue.delete-ok) instance. - # @yieldparam [AMQP::Protocol::Queue::DeleteOk] delete_ok AMQP queue.delete-ok) instance. Carries number of messages that were in the queue. + # @yieldparam [AMQ::Protocol::Queue::DeleteOk] delete_ok AMQP queue.delete-ok) instance. Carries number of messages that were in the queue. # # @api public # @see Queue#purge # @see Queue#unbind def delete(opts = {}, &block) @channel.once_open do self.once_name_is_available do - super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) + queue_delete(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) end end # backwards compatibility nil @@ -404,19 +468,19 @@ # # @return [NilClass] nil (for v0.7 compatibility) # # # @yield [purge_ok] Yields AMQP method (queue.purge-ok) instance. - # @yieldparam [AMQP::Protocol::Queue::PurgeOk] purge_ok AMQP queue.purge-ok) instance. Carries number of messages that were purged. + # @yieldparam [AMQ::Protocol::Queue::PurgeOk] purge_ok AMQP queue.purge-ok) instance. Carries number of messages that were purged. # # @api public # @see Queue#delete # @see Queue#unbind def purge(opts = {}, &block) @channel.once_open do self.once_declared do - super(opts.fetch(:nowait, false), &block) + queue_purge(opts.fetch(:nowait, false), &block) end end # backwards compatibility nil @@ -475,11 +539,11 @@ end } @channel.once_open do self.once_name_is_available do - # see AMQ::Client::Queue#get in amq-client + # see AMQP::Queue#get in amq-client self.get(!opts.fetch(:ack, false), &shim) end end else @channel.once_open do @@ -725,19 +789,26 @@ @channel.once_open do self.once_name_is_available do # guards against a pathological case race condition when a channel # is opened and closed before delayed operations are completed. - self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) + self.basic_consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) self.on_delivery(&block) end end self end + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.8.3.9) + def on_delivery(&block) + @default_consumer.on_delivery(&block) + end # on_delivery(&block) + + # @return [String] Consumer tag of the default consumer associated with this queue (if any), or nil # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. # @see Queue#subscribe # @see AMQP::Consumer # @api public @@ -784,11 +855,11 @@ # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method, the callback (if passed) will be ignored. If the server could not complete the # method it will raise a channel or connection exception. # # @yield [cancel_ok] - # @yieldparam [AMQP::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it. + # @yieldparam [AMQ::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it. # # # @api public def unsubscribe(opts = {}, &block) @channel.once_open do @@ -878,17 +949,453 @@ # @private # @api plugin def handle_connection_interruption(method = nil) - super(method) + @consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) } + self.exec_callback_yielding_self(:after_connection_interruption) + @declaration_deferrable = EventMachine::DefaultDeferrable.new end def handle_declare_ok(method) - super(method) + @name = method.queue if @name.empty? + @channel.register_queue(self) + + self.exec_callback_once_yielding_self(:declare, method) + @declaration_deferrable.succeed + end + + + # @group Declaration + + # Declares this queue. + # + # + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.1.) + def queue_declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) + raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous? + + # these two are for autorecovery. MK. + @passive = passive + @server_named = @name.empty? + + @durable = durable + @exclusive = exclusive + @auto_delete = auto_delete + @arguments = arguments + + nowait = true if !block && !@name.empty? && nowait.nil? + @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) + + if !nowait + self.append_callback(:declare, &block) + @channel.queues_awaiting_declare_ok.push(self) + end + + self + end + + # Re-declares queue with the same attributes + # @api public + def redeclare(&block) + nowait = true if !block && !@name.empty? + + # server-named queues get their new generated names. + new_name = if @server_named + AMQ::Protocol::EMPTY_STRING + else + @name + end + @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments)) + + if !nowait + self.append_callback(:declare, &block) + @channel.queues_awaiting_declare_ok.push(self) + end + + self + end + + # @endgroup + + + + # Deletes this queue. + # + # @param [Boolean] if_unused delete only if queue has no consumers (subscribers). + # @param [Boolean] if_empty delete only if queue has no messages in it. + # @param [Boolean] nowait Don't wait for reply from broker. + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.9.) + def queue_delete(if_unused = false, if_empty = false, nowait = false, &block) + nowait = true unless block + @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) + + if !nowait + self.append_callback(:delete, &block) + + # TODO: delete itself from queues cache + @channel.queues_awaiting_delete_ok.push(self) + end + + self + end # delete(channel, queue, if_unused, if_empty, nowait, &block) + + + + # @group Binding + + # + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.3.) + def queue_bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) + nowait = true unless block + exchange_name = if exchange.respond_to?(:name) + exchange.name + else + + exchange + end + + @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) + + if !nowait + self.append_callback(:bind, &block) + @channel.queues_awaiting_bind_ok.push(self) + end + + # store bindings for automatic recovery, but BE VERY CAREFUL to + # not cause an infinite rebinding loop here when we recover. MK. + binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments } + @bindings.push(binding) unless @bindings.include?(binding) + + self + end + + # + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.5.) + def queue_unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) + exchange_name = if exchange.respond_to?(:name) + exchange.name + else + + exchange + end + + @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) + + self.append_callback(:unbind, &block) + @channel.queues_awaiting_unbind_ok.push(self) + + + @bindings.delete_if { |b| b[:exchange] == exchange_name } + + self + end + + # @endgroup + + + + + # @group Consuming messages + + # + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.3.) + def basic_consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) + raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer + + nowait = true unless block + @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block) + @default_consumer.consume(nowait, &block) + + self + end + + # Unsubscribes from message delivery. + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.5.) + def cancel(nowait = false, &block) + raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil? + + @default_consumer.cancel(nowait, &block) + + self + end # cancel(&block) + + # @api public + def on_cancel(&block) + @default_consumer.on_cancel(&block) + end # on_cancel(&block) + + # @endgroup + + + + + # @group Working With Messages + + # Fetches messages from the queue. + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.10.) + def get(no_ack = false, &block) + @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@channel.id, @name, no_ack)) + + # most people only want one callback per #get call. Consider the following example: + # + # 100.times { queue.get { ... } } + # + # most likely you won't expect 100 callback runs per message here. MK. + self.redefine_callback(:get, &block) + @channel.queues_awaiting_get_response.push(self) + + self + end # get(no_ack = false, &block) + + + + # Purges (removes all messagse from) the queue. + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.7.) + def queue_purge(nowait = false, &block) + nowait = true unless block + @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) + + if !nowait + self.redefine_callback(:purge, &block) + # TODO: handle channel & connection-level exceptions + @channel.queues_awaiting_purge_ok.push(self) + end + + self + end # purge(nowait = false, &block) + + # @endgroup + + + + # @group Acknowledging & Rejecting Messages + + # Acknowledge a delivery tag. + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.) + def acknowledge(delivery_tag) + @channel.acknowledge(delivery_tag) + + self + end # acknowledge(delivery_tag) + + # + # @return [Queue] self + # + # @api public + # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.) + def reject(delivery_tag, requeue = true) + @channel.reject(delivery_tag, requeue) + + self + end # reject(delivery_tag, requeue = true) + + # @endgroup + + + + + # @group Error Handling & Recovery + + # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def on_connection_interruption(&block) + self.redefine_callback(:after_connection_interruption, &block) + end # on_connection_interruption(&block) + alias after_connection_interruption on_connection_interruption + + # Defines a callback that will be executed after TCP connection is recovered after a network failure + # but before AMQP connection is re-opened. + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def before_recovery(&block) + self.redefine_callback(:before_recovery, &block) + end # before_recovery(&block) + + # @private + def run_before_recovery_callbacks + self.exec_callback_yielding_self(:before_recovery) + + @consumers.each { |tag, c| c.run_before_recovery_callbacks } + end + + + # Defines a callback that will be executed when AMQP connection is recovered after a network failure.. + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def on_recovery(&block) + self.redefine_callback(:after_recovery, &block) + end # on_recovery(&block) + alias after_recovery on_recovery + + # @private + def run_after_recovery_callbacks + self.exec_callback_yielding_self(:after_recovery) + + @consumers.each { |tag, c| c.run_after_recovery_callbacks } + end + + + + # Called by associated connection object when AMQP connection has been re-established + # (for example, after a network failure). + # + # @api plugin + def auto_recover + self.exec_callback_yielding_self(:before_recovery) + self.redeclare do + self.rebind + + @consumers.each { |tag, consumer| consumer.auto_recover } + + self.exec_callback_yielding_self(:after_recovery) + end + end # auto_recover + + # @endgroup + + + # + # Implementation + # + + + # Unique string supposed to be used as a consumer tag. + # + # @return [String] Unique string. + # @api plugin + def generate_consumer_tag(name) + "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" + end + + + def handle_connection_interruption(method = nil) + @consumers.each { |tag, c| c.handle_connection_interruption(method) } + end # handle_connection_interruption(method = nil) + + + def handle_delete_ok(method) + self.exec_callback_once(:delete, method) + end # handle_delete_ok(method) + + def handle_purge_ok(method) + self.exec_callback_once(:purge, method) + end # handle_purge_ok(method) + + def handle_bind_ok(method) + self.exec_callback_once(:bind, method) + end # handle_bind_ok(method) + + def handle_unbind_ok(method) + self.exec_callback_once(:unbind, method) + end # handle_unbind_ok(method) + + def handle_get_ok(method, header, payload) + method = AMQ::Protocol::GetResponse.new(method) + self.exec_callback(:get, method, header, payload) + end # handle_get_ok(method, header, payload) + + def handle_get_empty(method) + method = AMQ::Protocol::GetResponse.new(method) + self.exec_callback(:get, method) + end # handle_get_empty(method) + + + + # Get the first queue which didn't receive Queue.Declare-Ok yet and run its declare callback. + # The cache includes only queues with {nowait: false}. + self.handle(AMQ::Protocol::Queue::DeclareOk) do |connection, frame| + method = frame.decode_payload + + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_declare_ok.shift + + queue.handle_declare_ok(method) + end + + + self.handle(AMQ::Protocol::Queue::DeleteOk) do |connection, frame| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_delete_ok.shift + queue.handle_delete_ok(frame.decode_payload) + end + + + self.handle(AMQ::Protocol::Queue::BindOk) do |connection, frame| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_bind_ok.shift + + queue.handle_bind_ok(frame.decode_payload) + end + + + self.handle(AMQ::Protocol::Queue::UnbindOk) do |connection, frame| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_unbind_ok.shift + + queue.handle_unbind_ok(frame.decode_payload) + end + + + self.handle(AMQ::Protocol::Queue::PurgeOk) do |connection, frame| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_purge_ok.shift + + queue.handle_purge_ok(frame.decode_payload) + end + + + self.handle(AMQ::Protocol::Basic::GetOk) do |connection, frame, content_frames| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_get_response.shift + method = frame.decode_payload + + header = content_frames.shift + body = content_frames.map {|frame| frame.payload }.join + + queue.handle_get_ok(method, header, body) if queue + end + + + self.handle(AMQ::Protocol::Basic::GetEmpty) do |connection, frame| + channel = connection.channels[frame.channel] + queue = channel.queues_awaiting_get_response.shift + + queue.handle_get_empty(frame.decode_payload) end protected