# encoding: utf-8 require "amqp/entity" require "amq/protocol/get_response" require "amqp/consumer" module AMQP # h2. What are AMQP queues? # # Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. # Messages flow from producing applications to {Exchange exchanges} that route them # to queues and finally queues deliver them to consumer applications (or consumer # applications fetch messages as needed). # # Note that unlike some other messaging protocols/systems, messages are not delivered directly # to queues. They are delivered to exchanges that route messages to queues using rules # knows as *bindings*. # # # h2. Concept of bindings # # Binding is an association between a queue and an exchange. # Queues must be bound to at least one exchange in order to receive messages from publishers. # Learn more about bindings in {Exchange Exchange class documentation}. # # # h2. Key methods # # Key methods of Queue class are # # * {Queue#bind} # * {Queue#subscribe} # * {Queue#pop} # * {Queue#delete} # * {Queue#purge} # * {Queue#unbind} # # # h2. Queue names. Server-named queues. Predefined queues. # # Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI # path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below). # Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument. # # Here is an example: # # # # If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor: # # # # Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this # rule will result in AMQP::IncompatibleOptionsError to be thrown (when # queue is re-declared on the same channel object) or channel-level exception (when originally queue # was declared on one channel and re-declaration with different attributes happens on another channel). # Learn more in {file:docs/Queues.textile Queues guide} and {file:docs/ErrorHandling.textile Error Handling guide}. # # # # h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't. # # To quote AMQP 0.9.1 spec, there are two common message queue life-cycles: # # * Durable message queues that are shared by many consumers and have an independent existence: i.e. they # will continue to exist and collect messages whether or not there are consumers to receive them. # * Temporary message queues that are private to one consumer and are tied to that consumer. When the # consumer disconnects, the message queue is deleted. # # There are some variations on these, such as shared message queues that are deleted when the last of # many consumers disconnects. # # One example of durable message queues is well-known services like event collectors (event loggers). # They are usually up whether there are services to log anything or not. Other applications know what # queues they use and can rely on those queues being around all the time, survive broker restarts and # in general be available should an application in the network need to use them. In this case, # explicitly named durable queues are optimal and coupling it creates between applications is not # an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server # like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated # queue names, and so do other applications that use them. # # Different scenario is in "a cloud settings" when some kind of workers/instances may come online and # go down basically any time and other applications cannot rely on them being available. Using well-known # queue names in this case is possible but server-generated, short-lived queues that are bound to # topic or fanout exchanges to receive relevant messages is a better idea. # # Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes # up, development operations may spin up additional applications instances in the cloud to handle the load. # Those new instances want to subscribe to receive messages to process but the rest of the system doesn't # know anything about them, rely on them being online or try to address them directly: they process events # from a shared stream and are not different from their peers. In a case like this, there is no reason for # message consumers to not use queue names generated by the broker. # # In general, use of explicitly named or server-named queues depends on messaging pattern your application needs. # {http://www.eaipatterns.com/ Enterprise Integration Patters} discusses many messaging patterns in depth. # RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}. # # # h2. Queue durability and persistence of messages. # # Learn more in our {http://rubyamqp.info/articles/durability/}. # # # h2. Message ordering # # RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues} # # # h2. Error handling # # When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks # are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need. # Learn more in {file:docs/ErrorHandling.textile Error Handling guide}. # # # @note Please make sure you read {http://rubyamqp.info/articles/durability/} that covers exchanges durability vs. messages # persistence. # # # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1) # @see AMQP::Exchange class Queue # # Behaviours # include Entity include ServerNamedEntity extend ProtocolMethodHandlers # # API # # Name of this queue attr_reader :name # Options this queue object was instantiated with attr_accessor :opts # Channel this queue belongs to. # @return [AMQP::Channel] attr_reader :channel # @return [Array] All consumers on this queue. attr_reader :consumers # @return [AMQP::Consumer] Default consumer (registered with {Queue#consume}). attr_reader :default_consumer # @return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions. attr_reader :arguments # @return [Array] attr_reader :bindings # @option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not # already exist. The client can use this to check whether the queue # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new queue, the queue will be marked as # durable. Durable queues remain active when a server restarts. # Non-durable queues (transient queues) are purged if/when a # server restarts. Note that durable queues do not necessarily # hold persistent messages, although it does not make sense to # send persistent messages to a transient queue (though it is # allowed). # # @option opts [Boolean] :exclusive (false) Exclusive queues may only be consumed from by the current connection. # Setting the 'exclusive' flag always implies 'auto-delete'. Only a # single consumer is allowed to remove messages from this queue. # The default is a shared queue. Multiple clients may consume messages # from this queue. # # @option opts [Boolean] :auto_delete (false) If set, the queue is deleted when all consumers have finished # using it. Last consumer can be cancelled either explicitly or because # its channel is closed. If there was no consumer ever on the queue, it # won't be deleted. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # # @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Some brokers implement # AMQP extensions using x-prefixed declaration arguments. For example, RabbitMQ # recognizes x-message-ttl declaration arguments that defines TTL of messages in # the queue. # # # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQ::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @api public def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @channel = channel @name = name unless name.empty? @server_named = name.empty? @opts = self.class.add_default_options(name, opts, block) raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait] # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. @declaration_deferrable = AMQP::Deferrable.new super(channel.connection) @name = name # this has to stay true even after queue.declare-ok arrives. MK. @server_named = @name.empty? if @server_named self.on_connection_interruption do # server-named queue need to get new names after recovery. MK. @name = AMQ::Protocol::EMPTY_STRING end end @channel = channel # primarily for autorecovery. MK. @bindings = Array.new @consumers = Hash.new shim = Proc.new do |q, declare_ok| case block.arity when 1 then block.call(q) else block.call(q, declare_ok) end end @channel.once_open do if @opts[:nowait] @declaration_deferrable.succeed block.call(self) if block end if block self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) else # we cannot pass :nowait as true here, AMQP::Queue will (rightfully) raise an exception because # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK. self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]) end end end # Defines a callback that will be executed once queue is declared. More than one callback can be defined. # if queue is already declared, given callback is executed immediately. # # @api public def once_declared(&block) @declaration_deferrable.callback do # guards against cases when deferred operations # don't complete before the channel is closed block.call if @channel.open? end end # once_declared(&block) # @return [Boolean] true if this queue was declared as durable (will survive broker restart). # @api public def durable? @durable end # durable? # @return [Boolean] true if this queue was declared as exclusive (limited to just one consumer) # @api public def exclusive? @exclusive end # exclusive? # @return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds). # @api public def auto_delete? @auto_delete end # auto_delete? # @return [Boolean] true if this queue is server-named def server_named? @server_named end # server_named? # This method binds a queue to an exchange. Until a queue is # bound it will not receive any messages. In a classic messaging # model, store-and-forward queues are bound to a dest exchange # and subscription queues are bound to a dest_wild exchange. # # A valid exchange name (or reference) must be passed as the first # parameter. # @example Binding a queue to exchange using AMQP::Exchange instance # # ch = AMQP::Channel.new(connection) # exchange = ch.direct('backlog.events') # queue = ch.queue('', :exclusive => true) # queue.bind(exchange) # # # @example Binding a queue to exchange using exchange name # # ch = AMQP::Channel.new(connection) # queue = ch.queue('', :exclusive => true) # queue.bind('backlog.events') # # # Note that if your producer application knows consumer queue name and wants to deliver # a message there, direct exchange may be sufficient (in other words, if your code declares an exchange with # the same name as a queue and binds it to that queue, consider using the default exchange and routing key on publishing). # # @param [Exchange] Exchange to bind to. May also be a string or any object that responds to #name. # # @option opts [String] :routing_key Specifies the routing key for the binding. The routing key is # used for routing messages depending on the exchange configuration. # Not all exchanges use a routing key! Refer to the specific # exchange documentation. If the routing key is empty and the queue # name is empty, the routing key will be the current queue for the # channel, which is the last declared queue. # # @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Headers exchange type uses these metadata # attributes for routing matching. # In addition, brokers may implement AMQP extensions using x-prefixed declaration arguments. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # @return [Queue] Self # # # @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block. # # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_bind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end end self end # @group Error Handling and Recovery # Used by automatic recovery machinery. # @private # @api plugin def rebind(&block) @bindings.each { |b| self.bind(b[:exchange], b) } end # Called by associated connection object when AMQP connection has been re-established # (for example, after a network failure). # # @api plugin def auto_recover self.exec_callback_yielding_self(:before_recovery) if self.server_named? old_name = @name.dup @name = AMQ::Protocol::EMPTY_STRING @channel.queues.delete(old_name) end self.redeclare do self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) end end # auto_recover # @endgroup # Remove the binding between the queue and exchange. The queue will # not receive any more messages until it is bound to another # exchange. # # Due to the asynchronous nature of the protocol, it is possible for # "in flight" messages to be received after this call completes. # Those messages will be serviced by the last block used in a # {Queue#subscribe} or {Queue#pop} call. # # @param [Exchange] Exchange to unbind from. # @option opts [String] :routing_key Binding routing key # @option opts [Hash] :arguments Binding arguments # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # # @yield [] Since queue.unbind-ok carries no attributes, no parameters are yielded to the block. # # @api public # @see Queue#bind def unbind(exchange, opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_unbind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) end end end # This method deletes a queue. When a queue is deleted any pending # messages are sent to a dead-letter queue if this is defined in the # server configuration, and all consumers on the queue are cancelled. # # @return [NilClass] nil (for v0.7 compatibility) # # @option opts [Boolean] :if_unused (false) If set, the server will only delete the queue if it has no # consumers. If the queue has consumers the server does does not # delete it but raises a channel exception instead. # # @option opts [Boolean] :if_empty (false) If set, the server will only delete the queue if it has no # messages. If the queue is not empty the server raises a channel # exception. # # @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # # @return [NilClass] nil (for v0.7 compatibility) # # @yield [delete_ok] Yields AMQP method (queue.delete-ok) instance. # @yieldparam [AMQ::Protocol::Queue::DeleteOk] delete_ok AMQP queue.delete-ok) instance. Carries number of messages that were in the queue. # # @api public # @see Queue#purge # @see Queue#unbind def delete(opts = {}, &block) @channel.once_open do self.once_name_is_available do queue_delete(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) end end # backwards compatibility nil end # This method removes all messages from a queue which are not awaiting acknowledgment. # # @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @return [NilClass] nil (for v0.7 compatibility) # # # @yield [purge_ok] Yields AMQP method (queue.purge-ok) instance. # @yieldparam [AMQ::Protocol::Queue::PurgeOk] purge_ok AMQP queue.purge-ok) instance. Carries number of messages that were purged. # # @api public # @see Queue#delete # @see Queue#unbind def purge(opts = {}, &block) @channel.once_open do self.once_declared do queue_purge(opts.fetch(:nowait, false), &block) end end # backwards compatibility nil end # This method provides a direct access to the messages in a queue # using a synchronous dialogue that is designed for specific types of # application where synchronous functionality is more important than # performance. # # If queue is empty, `payload` callback argument will be nil, otherwise arguments # are identical to those of {AMQP::Queue#subscribe} callback. # # @example Fetching messages off AMQP queue on demand # # queue.pop do |metadata, payload| # if payload # puts "Fetched a message: #{payload.inspect}, content_type: #{metadata.content_type}. Shutting down..." # else # puts "No messages in the queue" # end # end # # @option opts [Boolean] :ack (false) If this field is set to false the server does not expect acknowledgments # for messages. That is, when a message is delivered to the client # the server automatically and silently acknowledges it on behalf # of the client. This functionality increases performance but at # the cost of reliability. Messages can get lost if a client dies # before it can deliver them to the application. # # # @return [Qeueue] Self # # # @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload. # @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key). # @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding. # # @api public def pop(opts = {}, &block) if block # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. shim = Proc.new { |method, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, method, headers ? headers.decode_payload : nil) block.call(h, payload) else h = Header.new(@channel, method, headers ? headers.decode_payload : nil) block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) end } @channel.once_open do self.once_name_is_available do # see AMQP::Queue#get in amq-client self.get(!opts.fetch(:ack, false), &shim) end end else @channel.once_open do self.once_name_is_available do self.get(!opts.fetch(:ack, false)) end end end end # Subscribes to asynchronous message delivery. # # The provided block is passed a single message each time the # exchange matches a message to this queue. # # # Attempts to {Queue#subscribe} multiple times to the same exchange will raise an # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead. # {file:docs/Queues.textile Documentation guide on queues} explains this and other topics # in great detail. # # # @example Use of callback with a single argument # # EventMachine.run do # exchange = AMQP::Channel.direct("foo queue") # EM.add_periodic_timer(1) do # exchange.publish("random number #{rand(1000)}") # end # # queue = AMQP::Channel.queue('foo queue') # queue.subscribe { |body| puts "received payload [#{body}]" } # end # # If the block takes 2 parameters, both the header and the body will # be passed in for processing. # # @example Use of callback with two arguments # # EventMachine.run do # connection = AMQP.connect(:host => '127.0.0.1') # puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..." # # channel = AMQP::Channel.new(connection) # queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true) # exchange = channel.direct("amq.direct") # # queue.bind(exchange) # # channel.on_error do |ch, channel_close| # puts channel_close.reply_text # connection.close { EventMachine.stop } # end # # queue.subscribe do |metadata, payload| # puts "metadata.routing_key : #{metadata.routing_key}" # puts "metadata.content_type: #{metadata.content_type}" # puts "metadata.priority : #{metadata.priority}" # puts "metadata.headers : #{metadata.headers.inspect}" # puts "metadata.timestamp : #{metadata.timestamp.inspect}" # puts "metadata.type : #{metadata.type}" # puts "metadata.delivery_tag: #{metadata.delivery_tag}" # puts "metadata.redelivered : #{metadata.redelivered}" # # puts "metadata.app_id : #{metadata.app_id}" # puts "metadata.exchange : #{metadata.exchange}" # puts # puts "Received a message: #{payload}. Disconnecting..." # # connection.close { # EventMachine.stop { exit } # } # end # # exchange.publish("Hello, world!", # :app_id => "amqpgem.example", # :priority => 8, # :type => "kinda.checkin", # # headers table keys can be anything # :headers => { # :coordinates => { # :latitude => 59.35, # :longitude => 18.066667 # }, # :participants => 11, # :venue => "Stockholm" # }, # :timestamp => Time.now.to_i) # end # # @example Using object as consumer (message handler), take one # # class Consumer # # # # # API # # # # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING) # @queue_name = queue_name # # @channel = channel # # Consumer#handle_channel_exception will handle channel # # exceptions. Keep in mind that you can only register one error handler, # # so the last one registered "wins". # @channel.on_error(&method(:handle_channel_exception)) # end # initialize # # def start # @queue = @channel.queue(@queue_name, :exclusive => true) # # #handle_message method will be handling messages routed to @queue # @queue.subscribe(&method(:handle_message)) # end # start # # # # # # # Implementation # # # # def handle_message(metadata, payload) # puts "Received a message: #{payload}, content_type = #{metadata.content_type}" # end # handle_message(metadata, payload) # # def handle_channel_exception(channel, channel_close) # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" # end # handle_channel_exception(channel, channel_close) # end # # # @example Using object as consumer (message handler), take two: aggregatied handler # class Consumer # # # # # API # # # # def handle_message(metadata, payload) # puts "Received a message: #{payload}, content_type = #{metadata.content_type}" # end # handle_message(metadata, payload) # end # # # class Worker # # # # # API # # # # # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new) # @queue_name = queue_name # # @channel = channel # @channel.on_error(&method(:handle_channel_exception)) # # @consumer = consumer # end # initialize # # def start # @queue = @channel.queue(@queue_name, :exclusive => true) # @queue.subscribe(&@consumer.method(:handle_message)) # end # start # # # # # # # Implementation # # # # def handle_channel_exception(channel, channel_close) # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" # end # handle_channel_exception(channel, channel_close) # end # # @example Unit-testing objects that are used as consumers, RSpec style # # require "ostruct" # require "json" # # # RSpec example # describe Consumer do # describe "when a new message arrives" do # subject { described_class.new } # # let(:metadata) do # o = OpenStruct.new # # o.content_type = "application/json" # o # end # let(:payload) { JSON.encode({ :command => "reload_config" }) } # # it "does some useful work" do # # check preconditions here if necessary # # subject.handle_message(metadata, payload) # # # add your code expectations here # end # end # end # # # # @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments # for messages. That is, when a message is delivered to the client # the server automatically and silently acknowledges it on behalf # of the client. This functionality increases performance but at # the cost of reliability. Messages can get lost if a client dies # before it can deliver them to the application. # # @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @option opts [#call] :confirm (nil) If set, this proc will be called when the server confirms subscription # to the queue with a basic.consume-ok message. Setting this option will # automatically set :nowait => false. This is required for the server # to send a confirmation. # # @option opts [Boolean] :exclusive (false) Request exclusive consumer access, meaning only this consumer can access the queue. # This is useful when you want a long-lived shared queue to be temporarily accessible by just # one application (or thread, or process). If application exclusive consumer is part of crashes # or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled. # # # @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload. # @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key). # @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding. # # @return [Queue] Self # @api public # # @see file:docs/Queues.textile Documentation guide on queues # @see #unsubscribe # @see AMQP::Consumer def subscribe(opts = {}, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) @channel.once_open do self.once_name_is_available do # guards against a pathological case race condition when a channel # is opened and closed before delayed operations are completed. self.basic_consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) self.on_delivery(&block) end end self end # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.8.3.9) def on_delivery(&block) @default_consumer.on_delivery(&block) end # on_delivery(&block) # @return [String] Consumer tag of the default consumer associated with this queue (if any), or nil # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. # @see Queue#subscribe # @see AMQP::Consumer # @api public def consumer_tag if @default_consumer @default_consumer.consumer_tag else nil end end # consumer_tag # @return [AMQP::Consumer] Default consumer associated with this queue (if any), or nil # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. # @see Queue#subscribe # @see AMQP::Consumer # @api public def default_consumer @default_consumer end # @return [Class] # @private def self.consumer_class AMQP::Consumer end # self.consumer_class # Removes the subscription from the queue and cancels the consumer. Once consumer is cancelled, # messages will no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for # “in flight” messages to be received after this call completes. # Those messages will be serviced by the last block used in a # {Queue#subscribe} or {Queue#pop} call. # # Fetching messages with {AMQP::Queue#pop} is still possible even after consumer is cancelled. # # Additionally, if the queue was created with _autodelete_ set to # true, the server will delete the queue after its wait period # has expired unless the queue is bound to an active exchange. # # The method accepts a block which will be executed when the # unsubscription request is acknowledged as complete by the server. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method, the callback (if passed) will be ignored. If the server could not complete the # method it will raise a channel or connection exception. # # @yield [cancel_ok] # @yieldparam [AMQ::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it. # # # @api public def unsubscribe(opts = {}, &block) @channel.once_open do self.once_name_is_available do if @default_consumer @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil end end end end # Get the number of messages and active consumers (with active channel flow) on a queue. # # @example Getting number of messages and active consumers for a queue # # AMQP::Channel.queue('name').status { |number_of_messages, number_of_active_consumers| # puts number_of_messages # } # # @yield [number_of_messages, number_of_active_consumers] # @yieldparam [Fixnum] number_of_messages Number of messages in the queue # @yieldparam [Fixnum] number_of_active_consumers Number of active consumers for the queue. Note that consumers can suspend activity (Channel.Flow) in which case they do not appear in this count. # # @api public def status(opts = {}, &block) raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) } @channel.once_open do self.once_name_is_available do # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic # recovery process. MK. @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) self.append_callback(:declare, &shim) @channel.queues_awaiting_declare_ok.push(self) end end self end # Boolean check to see if the current queue has already subscribed # to messages delivery (has default consumer). # # Attempts to {Queue#subscribe} multiple times to the same exchange will raise an # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead. # # @return [Boolean] true if there is a consumer tag associated with this Queue instance # @api public # @deprecated def subscribed? @default_consumer && @default_consumer.subscribed? end # Compatibility alias for #on_declare. # # @api public # @deprecated def callback return nil if !subscribed? @default_consumer.callback end # Don't use this method. It is a leftover from very early days and # it ruins the whole point of exchanges/queue separation. # # @note This method will be removed before 1.0 release # @deprecated # @api public def publish(data, opts = {}) exchange.publish(data, opts.merge(:routing_key => self.name)) end # Resets queue state. Useful for error handling. # @api plugin def reset initialize(@channel, @name, @opts) end # @private # @api plugin def handle_connection_interruption(method = nil) @consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) @declaration_deferrable = EventMachine::DefaultDeferrable.new end def handle_declare_ok(method) @name = method.queue if @name.empty? @channel.register_queue(self) self.exec_callback_once_yielding_self(:declare, method) @declaration_deferrable.succeed end # @group Declaration # Declares this queue. # # # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.1.) def queue_declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous? # these two are for autorecovery. MK. @passive = passive @server_named = @name.empty? @durable = durable @exclusive = exclusive @auto_delete = auto_delete @arguments = arguments nowait = true if !block && !@name.empty? && nowait.nil? @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end # Re-declares queue with the same attributes # @api public def redeclare(&block) nowait = true if !block && !@name.empty? # server-named queues get their new generated names. new_name = if @server_named AMQ::Protocol::EMPTY_STRING else @name end @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end # @endgroup # Deletes this queue. # # @param [Boolean] if_unused delete only if queue has no consumers (subscribers). # @param [Boolean] if_empty delete only if queue has no messages in it. # @param [Boolean] nowait Don't wait for reply from broker. # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.9.) def queue_delete(if_unused = false, if_empty = false, nowait = false, &block) nowait = true unless block @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) if !nowait self.append_callback(:delete, &block) # TODO: delete itself from queues cache @channel.queues_awaiting_delete_ok.push(self) end self end # delete(channel, queue, if_unused, if_empty, nowait, &block) # @group Binding # # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.3.) def queue_bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) nowait = true unless block exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) if !nowait self.append_callback(:bind, &block) @channel.queues_awaiting_bind_ok.push(self) end # store bindings for automatic recovery, but BE VERY CAREFUL to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments } @bindings.push(binding) unless @bindings.include?(binding) self end # # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.5.) def queue_unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) self.append_callback(:unbind, &block) @channel.queues_awaiting_unbind_ok.push(self) @bindings.delete_if { |b| b[:exchange] == exchange_name } self end # @endgroup # @group Consuming messages # # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.3.) def basic_consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer nowait = true unless block @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block) @default_consumer.consume(nowait, &block) self end # Unsubscribes from message delivery. # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.5.) def cancel(nowait = false, &block) raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil? @default_consumer.cancel(nowait, &block) self end # cancel(&block) # @api public def on_cancel(&block) @default_consumer.on_cancel(&block) end # on_cancel(&block) # @endgroup # @group Working With Messages # Fetches messages from the queue. # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.10.) def get(no_ack = false, &block) @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@channel.id, @name, no_ack)) # most people only want one callback per #get call. Consider the following example: # # 100.times { queue.get { ... } } # # most likely you won't expect 100 callback runs per message here. MK. self.redefine_callback(:get, &block) @channel.queues_awaiting_get_response.push(self) self end # get(no_ack = false, &block) # Purges (removes all messagse from) the queue. # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.7.) def queue_purge(nowait = false, &block) nowait = true unless block @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) if !nowait self.redefine_callback(:purge, &block) # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_purge_ok.push(self) end self end # purge(nowait = false, &block) # @endgroup # @group Acknowledging & Rejecting Messages # Acknowledge a delivery tag. # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.) def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end # acknowledge(delivery_tag) # # @return [Queue] self # # @api public # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.) def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end # reject(delivery_tag, requeue = true) # @endgroup # @group Error Handling & Recovery # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end # on_connection_interruption(&block) alias after_connection_interruption on_connection_interruption # Defines a callback that will be executed after TCP connection is recovered after a network failure # but before AMQP connection is re-opened. # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end # before_recovery(&block) # @private def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) @consumers.each { |tag, c| c.run_before_recovery_callbacks } end # Defines a callback that will be executed when AMQP connection is recovered after a network failure.. # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end # on_recovery(&block) alias after_recovery on_recovery # @private def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) @consumers.each { |tag, c| c.run_after_recovery_callbacks } end # Called by associated connection object when AMQP connection has been re-established # (for example, after a network failure). # # @api plugin def auto_recover self.exec_callback_yielding_self(:before_recovery) self.redeclare do self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) end end # auto_recover # @endgroup # # Implementation # # Unique string supposed to be used as a consumer tag. # # @return [String] Unique string. # @api plugin def generate_consumer_tag(name) "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end def handle_connection_interruption(method = nil) @consumers.each { |tag, c| c.handle_connection_interruption(method) } end # handle_connection_interruption(method = nil) def handle_delete_ok(method) self.exec_callback_once(:delete, method) end # handle_delete_ok(method) def handle_purge_ok(method) self.exec_callback_once(:purge, method) end # handle_purge_ok(method) def handle_bind_ok(method) self.exec_callback_once(:bind, method) end # handle_bind_ok(method) def handle_unbind_ok(method) self.exec_callback_once(:unbind, method) end # handle_unbind_ok(method) def handle_get_ok(method, header, payload) method = AMQ::Protocol::GetResponse.new(method) self.exec_callback(:get, method, header, payload) end # handle_get_ok(method, header, payload) def handle_get_empty(method) method = AMQ::Protocol::GetResponse.new(method) self.exec_callback(:get, method) end # handle_get_empty(method) # Get the first queue which didn't receive Queue.Declare-Ok yet and run its declare callback. # The cache includes only queues with {nowait: false}. self.handle(AMQ::Protocol::Queue::DeclareOk) do |connection, frame| method = frame.decode_payload channel = connection.channels[frame.channel] queue = channel.queues_awaiting_declare_ok.shift queue.handle_declare_ok(method) end self.handle(AMQ::Protocol::Queue::DeleteOk) do |connection, frame| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_delete_ok.shift queue.handle_delete_ok(frame.decode_payload) end self.handle(AMQ::Protocol::Queue::BindOk) do |connection, frame| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_bind_ok.shift queue.handle_bind_ok(frame.decode_payload) end self.handle(AMQ::Protocol::Queue::UnbindOk) do |connection, frame| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_unbind_ok.shift queue.handle_unbind_ok(frame.decode_payload) end self.handle(AMQ::Protocol::Queue::PurgeOk) do |connection, frame| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_purge_ok.shift queue.handle_purge_ok(frame.decode_payload) end self.handle(AMQ::Protocol::Basic::GetOk) do |connection, frame, content_frames| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_get_response.shift method = frame.decode_payload header = content_frames.shift body = content_frames.map {|frame| frame.payload }.join queue.handle_get_ok(method, header, body) if queue end self.handle(AMQ::Protocol::Basic::GetEmpty) do |connection, frame| channel = connection.channels[frame.channel] queue = channel.queues_awaiting_get_response.shift queue.handle_get_empty(frame.decode_payload) end protected # @private def self.add_default_options(name, opts, block) { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) end def once_name_is_available(&block) if server_named? self.once_declared do block.call end else block.call end end private # Default direct exchange that we use to publish messages directly to this queue. # This is a leftover from very early days and will be removed before version 1.0. # # @deprecated def exchange @exchange ||= Exchange.new(@channel, :direct, AMQ::Protocol::EMPTY_STRING, :key => name) end end # Queue end # AMQP