# encoding: utf-8
require "amqp/entity"
require "amq/protocol/get_response"
require "amqp/consumer"
module AMQP
# h2. What are AMQP queues?
#
# Queues store and forward messages to consumers. They are similar to mailboxes in SMTP.
# Messages flow from producing applications to {Exchange exchanges} that route them
# to queues and finally queues deliver them to consumer applications (or consumer
# applications fetch messages as needed).
#
# Note that unlike some other messaging protocols/systems, messages are not delivered directly
# to queues. They are delivered to exchanges that route messages to queues using rules
# knows as *bindings*.
#
#
# h2. Concept of bindings
#
# Binding is an association between a queue and an exchange.
# Queues must be bound to at least one exchange in order to receive messages from publishers.
# Learn more about bindings in {Exchange Exchange class documentation}.
#
#
# h2. Key methods
#
# Key methods of Queue class are
#
# * {Queue#bind}
# * {Queue#subscribe}
# * {Queue#pop}
# * {Queue#delete}
# * {Queue#purge}
# * {Queue#unbind}
#
#
# h2. Queue names. Server-named queues. Predefined queues.
#
# Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI
# path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below).
# Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument.
#
# Here is an example:
#
#
#
# If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor:
#
#
#
# Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this
# rule will result in AMQP::IncompatibleOptionsError to be thrown (when
# queue is re-declared on the same channel object) or channel-level exception (when originally queue
# was declared on one channel and re-declaration with different attributes happens on another channel).
# Learn more in {file:docs/Queues.textile Queues guide} and {file:docs/ErrorHandling.textile Error Handling guide}.
#
#
#
# h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't.
#
# To quote AMQP 0.9.1 spec, there are two common message queue life-cycles:
#
# * Durable message queues that are shared by many consumers and have an independent existence: i.e. they
# will continue to exist and collect messages whether or not there are consumers to receive them.
# * Temporary message queues that are private to one consumer and are tied to that consumer. When the
# consumer disconnects, the message queue is deleted.
#
# There are some variations on these, such as shared message queues that are deleted when the last of
# many consumers disconnects.
#
# One example of durable message queues is well-known services like event collectors (event loggers).
# They are usually up whether there are services to log anything or not. Other applications know what
# queues they use and can rely on those queues being around all the time, survive broker restarts and
# in general be available should an application in the network need to use them. In this case,
# explicitly named durable queues are optimal and coupling it creates between applications is not
# an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server
# like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated
# queue names, and so do other applications that use them.
#
# Different scenario is in "a cloud settings" when some kind of workers/instances may come online and
# go down basically any time and other applications cannot rely on them being available. Using well-known
# queue names in this case is possible but server-generated, short-lived queues that are bound to
# topic or fanout exchanges to receive relevant messages is a better idea.
#
# Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes
# up, development operations may spin up additional applications instances in the cloud to handle the load.
# Those new instances want to subscribe to receive messages to process but the rest of the system doesn't
# know anything about them, rely on them being online or try to address them directly: they process events
# from a shared stream and are not different from their peers. In a case like this, there is no reason for
# message consumers to not use queue names generated by the broker.
#
# In general, use of explicitly named or server-named queues depends on messaging pattern your application needs.
# {http://www.eaipatterns.com/ Enterprise Integration Patters} discusses many messaging patterns in depth.
# RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}.
#
#
# h2. Queue durability and persistence of messages.
#
# Learn more in our {http://rubyamqp.info/articles/durability/}.
#
#
# h2. Message ordering
#
# RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues}
#
#
# h2. Error handling
#
# When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks
# are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need.
# Learn more in {file:docs/ErrorHandling.textile Error Handling guide}.
#
#
# @note Please make sure you read {http://rubyamqp.info/articles/durability/} that covers exchanges durability vs. messages
# persistence.
#
#
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1)
# @see AMQP::Exchange
class Queue
#
# Behaviours
#
include Entity
include ServerNamedEntity
extend ProtocolMethodHandlers
#
# API
#
# Name of this queue
attr_reader :name
# Options this queue object was instantiated with
attr_accessor :opts
# Channel this queue belongs to.
# @return [AMQP::Channel]
attr_reader :channel
# @return [Array] All consumers on this queue.
attr_reader :consumers
# @return [AMQP::Consumer] Default consumer (registered with {Queue#consume}).
attr_reader :default_consumer
# @return [Hash] Additional arguments given on queue declaration. Typically used by AMQP extensions.
attr_reader :arguments
# @return [Array]
attr_reader :bindings
# @option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not
# already exist. The client can use this to check whether the queue
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new queue, the queue will be marked as
# durable. Durable queues remain active when a server restarts.
# Non-durable queues (transient queues) are purged if/when a
# server restarts. Note that durable queues do not necessarily
# hold persistent messages, although it does not make sense to
# send persistent messages to a transient queue (though it is
# allowed).
#
# @option opts [Boolean] :exclusive (false) Exclusive queues may only be consumed from by the current connection.
# Setting the 'exclusive' flag always implies 'auto-delete'. Only a
# single consumer is allowed to remove messages from this queue.
# The default is a shared queue. Multiple clients may consume messages
# from this queue.
#
# @option opts [Boolean] :auto_delete (false) If set, the queue is deleted when all consumers have finished
# using it. Last consumer can be cancelled either explicitly or because
# its channel is closed. If there was no consumer ever on the queue, it
# won't be deleted.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Some brokers implement
# AMQP extensions using x-prefixed declaration arguments. For example, RabbitMQ
# recognizes x-message-ttl declaration arguments that defines TTL of messages in
# the queue.
#
#
# @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.
# @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
# @yieldparam [AMQ::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
#
# @api public
def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
@channel = channel
@name = name unless name.empty?
@server_named = name.empty?
@opts = self.class.add_default_options(name, opts, block)
raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait]
# a deferrable that we use to delay operations until this queue is actually declared.
# one reason for this is to support a case when a server-named queue is immediately bound.
# it's crazy, but 0.7.x supports it, so... MK.
@declaration_deferrable = AMQP::Deferrable.new
super(channel.connection)
@name = name
# this has to stay true even after queue.declare-ok arrives. MK.
@server_named = @name.empty?
if @server_named
self.on_connection_interruption do
# server-named queue need to get new names after recovery. MK.
@name = AMQ::Protocol::EMPTY_STRING
end
end
@channel = channel
# primarily for autorecovery. MK.
@bindings = Array.new
@consumers = Hash.new
shim = Proc.new do |q, declare_ok|
case block.arity
when 1 then block.call(q)
else
block.call(q, declare_ok)
end
end
@channel.once_open do
if @opts[:nowait]
@declaration_deferrable.succeed
block.call(self) if block
end
if block
self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim)
else
# we cannot pass :nowait as true here, AMQP::Queue will (rightfully) raise an exception because
# it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK.
self.queue_declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])
end
end
end
# Defines a callback that will be executed once queue is declared. More than one callback can be defined.
# if queue is already declared, given callback is executed immediately.
#
# @api public
def once_declared(&block)
@declaration_deferrable.callback do
# guards against cases when deferred operations
# don't complete before the channel is closed
block.call if @channel.open?
end
end # once_declared(&block)
# @return [Boolean] true if this queue was declared as durable (will survive broker restart).
# @api public
def durable?
@durable
end # durable?
# @return [Boolean] true if this queue was declared as exclusive (limited to just one consumer)
# @api public
def exclusive?
@exclusive
end # exclusive?
# @return [Boolean] true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
# @api public
def auto_delete?
@auto_delete
end # auto_delete?
# @return [Boolean] true if this queue is server-named
def server_named?
@server_named
end # server_named?
# This method binds a queue to an exchange. Until a queue is
# bound it will not receive any messages. In a classic messaging
# model, store-and-forward queues are bound to a dest exchange
# and subscription queues are bound to a dest_wild exchange.
#
# A valid exchange name (or reference) must be passed as the first
# parameter.
# @example Binding a queue to exchange using AMQP::Exchange instance
#
# ch = AMQP::Channel.new(connection)
# exchange = ch.direct('backlog.events')
# queue = ch.queue('', :exclusive => true)
# queue.bind(exchange)
#
#
# @example Binding a queue to exchange using exchange name
#
# ch = AMQP::Channel.new(connection)
# queue = ch.queue('', :exclusive => true)
# queue.bind('backlog.events')
#
#
# Note that if your producer application knows consumer queue name and wants to deliver
# a message there, direct exchange may be sufficient (in other words, if your code declares an exchange with
# the same name as a queue and binds it to that queue, consider using the default exchange and routing key on publishing).
#
# @param [Exchange] Exchange to bind to. May also be a string or any object that responds to #name.
#
# @option opts [String] :routing_key Specifies the routing key for the binding. The routing key is
# used for routing messages depending on the exchange configuration.
# Not all exchanges use a routing key! Refer to the specific
# exchange documentation. If the routing key is empty and the queue
# name is empty, the routing key will be the current queue for the
# channel, which is the last declared queue.
#
# @option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Headers exchange type uses these metadata
# attributes for routing matching.
# In addition, brokers may implement AMQP extensions using x-prefixed declaration arguments.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
# @return [Queue] Self
#
#
# @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block.
#
# @api public
# @see Queue#unbind
def bind(exchange, opts = {}, &block)
@channel.once_open do
self.once_name_is_available do
queue_bind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
end
end
self
end
# @group Error Handling and Recovery
# Used by automatic recovery machinery.
# @private
# @api plugin
def rebind(&block)
@bindings.each { |b| self.bind(b[:exchange], b) }
end
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
self.exec_callback_yielding_self(:before_recovery)
if self.server_named?
old_name = @name.dup
@name = AMQ::Protocol::EMPTY_STRING
@channel.queues.delete(old_name)
end
self.redeclare do
self.rebind
@consumers.each { |tag, consumer| consumer.auto_recover }
self.exec_callback_yielding_self(:after_recovery)
end
end # auto_recover
# @endgroup
# Remove the binding between the queue and exchange. The queue will
# not receive any more messages until it is bound to another
# exchange.
#
# Due to the asynchronous nature of the protocol, it is possible for
# "in flight" messages to be received after this call completes.
# Those messages will be serviced by the last block used in a
# {Queue#subscribe} or {Queue#pop} call.
#
# @param [Exchange] Exchange to unbind from.
# @option opts [String] :routing_key Binding routing key
# @option opts [Hash] :arguments Binding arguments
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @yield [] Since queue.unbind-ok carries no attributes, no parameters are yielded to the block.
#
# @api public
# @see Queue#bind
def unbind(exchange, opts = {}, &block)
@channel.once_open do
self.once_name_is_available do
queue_unbind(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block)
end
end
end
# This method deletes a queue. When a queue is deleted any pending
# messages are sent to a dead-letter queue if this is defined in the
# server configuration, and all consumers on the queue are cancelled.
#
# @return [NilClass] nil (for v0.7 compatibility)
#
# @option opts [Boolean] :if_unused (false) If set, the server will only delete the queue if it has no
# consumers. If the queue has consumers the server does does not
# delete it but raises a channel exception instead.
#
# @option opts [Boolean] :if_empty (false) If set, the server will only delete the queue if it has no
# messages. If the queue is not empty the server raises a channel
# exception.
#
# @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @return [NilClass] nil (for v0.7 compatibility)
#
# @yield [delete_ok] Yields AMQP method (queue.delete-ok) instance.
# @yieldparam [AMQ::Protocol::Queue::DeleteOk] delete_ok AMQP queue.delete-ok) instance. Carries number of messages that were in the queue.
#
# @api public
# @see Queue#purge
# @see Queue#unbind
def delete(opts = {}, &block)
@channel.once_open do
self.once_name_is_available do
queue_delete(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block)
end
end
# backwards compatibility
nil
end
# This method removes all messages from a queue which are not awaiting acknowledgment.
#
# @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
# @return [NilClass] nil (for v0.7 compatibility)
#
#
# @yield [purge_ok] Yields AMQP method (queue.purge-ok) instance.
# @yieldparam [AMQ::Protocol::Queue::PurgeOk] purge_ok AMQP queue.purge-ok) instance. Carries number of messages that were purged.
#
# @api public
# @see Queue#delete
# @see Queue#unbind
def purge(opts = {}, &block)
@channel.once_open do
self.once_declared do
queue_purge(opts.fetch(:nowait, false), &block)
end
end
# backwards compatibility
nil
end
# This method provides a direct access to the messages in a queue
# using a synchronous dialogue that is designed for specific types of
# application where synchronous functionality is more important than
# performance.
#
# If queue is empty, `payload` callback argument will be nil, otherwise arguments
# are identical to those of {AMQP::Queue#subscribe} callback.
#
# @example Fetching messages off AMQP queue on demand
#
# queue.pop do |metadata, payload|
# if payload
# puts "Fetched a message: #{payload.inspect}, content_type: #{metadata.content_type}. Shutting down..."
# else
# puts "No messages in the queue"
# end
# end
#
# @option opts [Boolean] :ack (false) If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
# the cost of reliability. Messages can get lost if a client dies
# before it can deliver them to the application.
#
#
# @return [Qeueue] Self
#
#
# @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload.
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key).
# @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
#
# @api public
def pop(opts = {}, &block)
if block
# We have to maintain this multiple arities jazz
# because older versions this gem are used in examples in at least 3
# books published by O'Reilly :(. MK.
shim = Proc.new { |method, headers, payload|
case block.arity
when 1 then
block.call(payload)
when 2 then
h = Header.new(@channel, method, headers ? headers.decode_payload : nil)
block.call(h, payload)
else
h = Header.new(@channel, method, headers ? headers.decode_payload : nil)
block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key)
end
}
@channel.once_open do
self.once_name_is_available do
# see AMQP::Queue#get in amq-client
self.get(!opts.fetch(:ack, false), &shim)
end
end
else
@channel.once_open do
self.once_name_is_available do
self.get(!opts.fetch(:ack, false))
end
end
end
end
# Subscribes to asynchronous message delivery.
#
# The provided block is passed a single message each time the
# exchange matches a message to this queue.
#
#
# Attempts to {Queue#subscribe} multiple times to the same exchange will raise an
# Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead.
# {file:docs/Queues.textile Documentation guide on queues} explains this and other topics
# in great detail.
#
#
# @example Use of callback with a single argument
#
# EventMachine.run do
# exchange = AMQP::Channel.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
# queue = AMQP::Channel.queue('foo queue')
# queue.subscribe { |body| puts "received payload [#{body}]" }
# end
#
# If the block takes 2 parameters, both the header and the body will
# be passed in for processing.
#
# @example Use of callback with two arguments
#
# EventMachine.run do
# connection = AMQP.connect(:host => '127.0.0.1')
# puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
#
# channel = AMQP::Channel.new(connection)
# queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true)
# exchange = channel.direct("amq.direct")
#
# queue.bind(exchange)
#
# channel.on_error do |ch, channel_close|
# puts channel_close.reply_text
# connection.close { EventMachine.stop }
# end
#
# queue.subscribe do |metadata, payload|
# puts "metadata.routing_key : #{metadata.routing_key}"
# puts "metadata.content_type: #{metadata.content_type}"
# puts "metadata.priority : #{metadata.priority}"
# puts "metadata.headers : #{metadata.headers.inspect}"
# puts "metadata.timestamp : #{metadata.timestamp.inspect}"
# puts "metadata.type : #{metadata.type}"
# puts "metadata.delivery_tag: #{metadata.delivery_tag}"
# puts "metadata.redelivered : #{metadata.redelivered}"
#
# puts "metadata.app_id : #{metadata.app_id}"
# puts "metadata.exchange : #{metadata.exchange}"
# puts
# puts "Received a message: #{payload}. Disconnecting..."
#
# connection.close {
# EventMachine.stop { exit }
# }
# end
#
# exchange.publish("Hello, world!",
# :app_id => "amqpgem.example",
# :priority => 8,
# :type => "kinda.checkin",
# # headers table keys can be anything
# :headers => {
# :coordinates => {
# :latitude => 59.35,
# :longitude => 18.066667
# },
# :participants => 11,
# :venue => "Stockholm"
# },
# :timestamp => Time.now.to_i)
# end
#
# @example Using object as consumer (message handler), take one
#
# class Consumer
#
# #
# # API
# #
#
# def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING)
# @queue_name = queue_name
#
# @channel = channel
# # Consumer#handle_channel_exception will handle channel
# # exceptions. Keep in mind that you can only register one error handler,
# # so the last one registered "wins".
# @channel.on_error(&method(:handle_channel_exception))
# end # initialize
#
# def start
# @queue = @channel.queue(@queue_name, :exclusive => true)
# # #handle_message method will be handling messages routed to @queue
# @queue.subscribe(&method(:handle_message))
# end # start
#
#
#
# #
# # Implementation
# #
#
# def handle_message(metadata, payload)
# puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
# end # handle_message(metadata, payload)
#
# def handle_channel_exception(channel, channel_close)
# puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
# end # handle_channel_exception(channel, channel_close)
# end
#
#
# @example Using object as consumer (message handler), take two: aggregatied handler
# class Consumer
#
# #
# # API
# #
#
# def handle_message(metadata, payload)
# puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
# end # handle_message(metadata, payload)
# end
#
#
# class Worker
#
# #
# # API
# #
#
#
# def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new)
# @queue_name = queue_name
#
# @channel = channel
# @channel.on_error(&method(:handle_channel_exception))
#
# @consumer = consumer
# end # initialize
#
# def start
# @queue = @channel.queue(@queue_name, :exclusive => true)
# @queue.subscribe(&@consumer.method(:handle_message))
# end # start
#
#
#
# #
# # Implementation
# #
#
# def handle_channel_exception(channel, channel_close)
# puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
# end # handle_channel_exception(channel, channel_close)
# end
#
# @example Unit-testing objects that are used as consumers, RSpec style
#
# require "ostruct"
# require "json"
#
# # RSpec example
# describe Consumer do
# describe "when a new message arrives" do
# subject { described_class.new }
#
# let(:metadata) do
# o = OpenStruct.new
#
# o.content_type = "application/json"
# o
# end
# let(:payload) { JSON.encode({ :command => "reload_config" }) }
#
# it "does some useful work" do
# # check preconditions here if necessary
#
# subject.handle_message(metadata, payload)
#
# # add your code expectations here
# end
# end
# end
#
#
#
# @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
# the cost of reliability. Messages can get lost if a client dies
# before it can deliver them to the application.
#
# @option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
# @option opts [#call] :confirm (nil) If set, this proc will be called when the server confirms subscription
# to the queue with a basic.consume-ok message. Setting this option will
# automatically set :nowait => false. This is required for the server
# to send a confirmation.
#
# @option opts [Boolean] :exclusive (false) Request exclusive consumer access, meaning only this consumer can access the queue.
# This is useful when you want a long-lived shared queue to be temporarily accessible by just
# one application (or thread, or process). If application exclusive consumer is part of crashes
# or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled.
#
#
# @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload.
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key).
# @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
#
# @return [Queue] Self
# @api public
#
# @see file:docs/Queues.textile Documentation guide on queues
# @see #unsubscribe
# @see AMQP::Consumer
def subscribe(opts = {}, &block)
raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer
opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])
@channel.once_open do
self.once_name_is_available do
# guards against a pathological case race condition when a channel
# is opened and closed before delayed operations are completed.
self.basic_consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])
self.on_delivery(&block)
end
end
self
end
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.8.3.9)
def on_delivery(&block)
@default_consumer.on_delivery(&block)
end # on_delivery(&block)
# @return [String] Consumer tag of the default consumer associated with this queue (if any), or nil
# @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind.
# @see Queue#subscribe
# @see AMQP::Consumer
# @api public
def consumer_tag
if @default_consumer
@default_consumer.consumer_tag
else
nil
end
end # consumer_tag
# @return [AMQP::Consumer] Default consumer associated with this queue (if any), or nil
# @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind.
# @see Queue#subscribe
# @see AMQP::Consumer
# @api public
def default_consumer
@default_consumer
end
# @return [Class]
# @private
def self.consumer_class
AMQP::Consumer
end # self.consumer_class
# Removes the subscription from the queue and cancels the consumer. Once consumer is cancelled,
# messages will no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for
# “in flight” messages to be received after this call completes.
# Those messages will be serviced by the last block used in a
# {Queue#subscribe} or {Queue#pop} call.
#
# Fetching messages with {AMQP::Queue#pop} is still possible even after consumer is cancelled.
#
# Additionally, if the queue was created with _autodelete_ set to
# true, the server will delete the queue after its wait period
# has expired unless the queue is bound to an active exchange.
#
# The method accepts a block which will be executed when the
# unsubscription request is acknowledged as complete by the server.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method, the callback (if passed) will be ignored. If the server could not complete the
# method it will raise a channel or connection exception.
#
# @yield [cancel_ok]
# @yieldparam [AMQ::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it.
#
#
# @api public
def unsubscribe(opts = {}, &block)
@channel.once_open do
self.once_name_is_available do
if @default_consumer
@default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil
end
end
end
end
# Get the number of messages and active consumers (with active channel flow) on a queue.
#
# @example Getting number of messages and active consumers for a queue
#
# AMQP::Channel.queue('name').status { |number_of_messages, number_of_active_consumers|
# puts number_of_messages
# }
#
# @yield [number_of_messages, number_of_active_consumers]
# @yieldparam [Fixnum] number_of_messages Number of messages in the queue
# @yieldparam [Fixnum] number_of_active_consumers Number of active consumers for the queue. Note that consumers can suspend activity (Channel.Flow) in which case they do not appear in this count.
#
# @api public
def status(opts = {}, &block)
raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block
shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) }
@channel.once_open do
self.once_name_is_available do
# we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic
# recovery process. MK.
@connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]))
self.append_callback(:declare, &shim)
@channel.queues_awaiting_declare_ok.push(self)
end
end
self
end
# Boolean check to see if the current queue has already subscribed
# to messages delivery (has default consumer).
#
# Attempts to {Queue#subscribe} multiple times to the same exchange will raise an
# Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead.
#
# @return [Boolean] true if there is a consumer tag associated with this Queue instance
# @api public
# @deprecated
def subscribed?
@default_consumer && @default_consumer.subscribed?
end
# Compatibility alias for #on_declare.
#
# @api public
# @deprecated
def callback
return nil if !subscribed?
@default_consumer.callback
end
# Don't use this method. It is a leftover from very early days and
# it ruins the whole point of exchanges/queue separation.
#
# @note This method will be removed before 1.0 release
# @deprecated
# @api public
def publish(data, opts = {})
exchange.publish(data, opts.merge(:routing_key => self.name))
end
# Resets queue state. Useful for error handling.
# @api plugin
def reset
initialize(@channel, @name, @opts)
end
# @private
# @api plugin
def handle_connection_interruption(method = nil)
@consumers.each { |tag, consumer| consumer.handle_connection_interruption(method) }
self.exec_callback_yielding_self(:after_connection_interruption)
@declaration_deferrable = EventMachine::DefaultDeferrable.new
end
def handle_declare_ok(method)
@name = method.queue if @name.empty?
@channel.register_queue(self)
self.exec_callback_once_yielding_self(:declare, method)
@declaration_deferrable.succeed
end
# @group Declaration
# Declares this queue.
#
#
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.1.)
def queue_declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous?
# these two are for autorecovery. MK.
@passive = passive
@server_named = @name.empty?
@durable = durable
@exclusive = exclusive
@auto_delete = auto_delete
@arguments = arguments
nowait = true if !block && !@name.empty? && nowait.nil?
@connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments))
if !nowait
self.append_callback(:declare, &block)
@channel.queues_awaiting_declare_ok.push(self)
end
self
end
# Re-declares queue with the same attributes
# @api public
def redeclare(&block)
nowait = true if !block && !@name.empty?
# server-named queues get their new generated names.
new_name = if @server_named
AMQ::Protocol::EMPTY_STRING
else
@name
end
@connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments))
if !nowait
self.append_callback(:declare, &block)
@channel.queues_awaiting_declare_ok.push(self)
end
self
end
# @endgroup
# Deletes this queue.
#
# @param [Boolean] if_unused delete only if queue has no consumers (subscribers).
# @param [Boolean] if_empty delete only if queue has no messages in it.
# @param [Boolean] nowait Don't wait for reply from broker.
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.9.)
def queue_delete(if_unused = false, if_empty = false, nowait = false, &block)
nowait = true unless block
@connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))
if !nowait
self.append_callback(:delete, &block)
# TODO: delete itself from queues cache
@channel.queues_awaiting_delete_ok.push(self)
end
self
end # delete(channel, queue, if_unused, if_empty, nowait, &block)
# @group Binding
#
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.3.)
def queue_bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
nowait = true unless block
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
@connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments))
if !nowait
self.append_callback(:bind, &block)
@channel.queues_awaiting_bind_ok.push(self)
end
# store bindings for automatic recovery, but BE VERY CAREFUL to
# not cause an infinite rebinding loop here when we recover. MK.
binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments }
@bindings.push(binding) unless @bindings.include?(binding)
self
end
#
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.5.)
def queue_unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
@connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments))
self.append_callback(:unbind, &block)
@channel.queues_awaiting_unbind_ok.push(self)
@bindings.delete_if { |b| b[:exchange] == exchange_name }
self
end
# @endgroup
# @group Consuming messages
#
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.3.)
def basic_consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer
nowait = true unless block
@default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block)
@default_consumer.consume(nowait, &block)
self
end
# Unsubscribes from message delivery.
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.5.)
def cancel(nowait = false, &block)
raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil?
@default_consumer.cancel(nowait, &block)
self
end # cancel(&block)
# @api public
def on_cancel(&block)
@default_consumer.on_cancel(&block)
end # on_cancel(&block)
# @endgroup
# @group Working With Messages
# Fetches messages from the queue.
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.10.)
def get(no_ack = false, &block)
@connection.send_frame(AMQ::Protocol::Basic::Get.encode(@channel.id, @name, no_ack))
# most people only want one callback per #get call. Consider the following example:
#
# 100.times { queue.get { ... } }
#
# most likely you won't expect 100 callback runs per message here. MK.
self.redefine_callback(:get, &block)
@channel.queues_awaiting_get_response.push(self)
self
end # get(no_ack = false, &block)
# Purges (removes all messagse from) the queue.
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.7.2.7.)
def queue_purge(nowait = false, &block)
nowait = true unless block
@connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@channel.id, @name, nowait))
if !nowait
self.redefine_callback(:purge, &block)
# TODO: handle channel & connection-level exceptions
@channel.queues_awaiting_purge_ok.push(self)
end
self
end # purge(nowait = false, &block)
# @endgroup
# @group Acknowledging & Rejecting Messages
# Acknowledge a delivery tag.
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.13.)
def acknowledge(delivery_tag)
@channel.acknowledge(delivery_tag)
self
end # acknowledge(delivery_tag)
#
# @return [Queue] self
#
# @api public
# @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
@channel.reject(delivery_tag, requeue)
self
end # reject(delivery_tag, requeue = true)
# @endgroup
# @group Error Handling & Recovery
# Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_connection_interruption(&block)
self.redefine_callback(:after_connection_interruption, &block)
end # on_connection_interruption(&block)
alias after_connection_interruption on_connection_interruption
# Defines a callback that will be executed after TCP connection is recovered after a network failure
# but before AMQP connection is re-opened.
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def before_recovery(&block)
self.redefine_callback(:before_recovery, &block)
end # before_recovery(&block)
# @private
def run_before_recovery_callbacks
self.exec_callback_yielding_self(:before_recovery)
@consumers.each { |tag, c| c.run_before_recovery_callbacks }
end
# Defines a callback that will be executed when AMQP connection is recovered after a network failure..
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_recovery(&block)
self.redefine_callback(:after_recovery, &block)
end # on_recovery(&block)
alias after_recovery on_recovery
# @private
def run_after_recovery_callbacks
self.exec_callback_yielding_self(:after_recovery)
@consumers.each { |tag, c| c.run_after_recovery_callbacks }
end
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
self.exec_callback_yielding_self(:before_recovery)
self.redeclare do
self.rebind
@consumers.each { |tag, consumer| consumer.auto_recover }
self.exec_callback_yielding_self(:after_recovery)
end
end # auto_recover
# @endgroup
#
# Implementation
#
# Unique string supposed to be used as a consumer tag.
#
# @return [String] Unique string.
# @api plugin
def generate_consumer_tag(name)
"#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end
def handle_connection_interruption(method = nil)
@consumers.each { |tag, c| c.handle_connection_interruption(method) }
end # handle_connection_interruption(method = nil)
def handle_delete_ok(method)
self.exec_callback_once(:delete, method)
end # handle_delete_ok(method)
def handle_purge_ok(method)
self.exec_callback_once(:purge, method)
end # handle_purge_ok(method)
def handle_bind_ok(method)
self.exec_callback_once(:bind, method)
end # handle_bind_ok(method)
def handle_unbind_ok(method)
self.exec_callback_once(:unbind, method)
end # handle_unbind_ok(method)
def handle_get_ok(method, header, payload)
method = AMQ::Protocol::GetResponse.new(method)
self.exec_callback(:get, method, header, payload)
end # handle_get_ok(method, header, payload)
def handle_get_empty(method)
method = AMQ::Protocol::GetResponse.new(method)
self.exec_callback(:get, method)
end # handle_get_empty(method)
# Get the first queue which didn't receive Queue.Declare-Ok yet and run its declare callback.
# The cache includes only queues with {nowait: false}.
self.handle(AMQ::Protocol::Queue::DeclareOk) do |connection, frame|
method = frame.decode_payload
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_declare_ok.shift
queue.handle_declare_ok(method)
end
self.handle(AMQ::Protocol::Queue::DeleteOk) do |connection, frame|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_delete_ok.shift
queue.handle_delete_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Queue::BindOk) do |connection, frame|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_bind_ok.shift
queue.handle_bind_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Queue::UnbindOk) do |connection, frame|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_unbind_ok.shift
queue.handle_unbind_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Queue::PurgeOk) do |connection, frame|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_purge_ok.shift
queue.handle_purge_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Basic::GetOk) do |connection, frame, content_frames|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_get_response.shift
method = frame.decode_payload
header = content_frames.shift
body = content_frames.map {|frame| frame.payload }.join
queue.handle_get_ok(method, header, body) if queue
end
self.handle(AMQ::Protocol::Basic::GetEmpty) do |connection, frame|
channel = connection.channels[frame.channel]
queue = channel.queues_awaiting_get_response.shift
queue.handle_get_empty(frame.decode_payload)
end
protected
# @private
def self.add_default_options(name, opts, block)
{ :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts)
end
def once_name_is_available(&block)
if server_named?
self.once_declared do
block.call
end
else
block.call
end
end
private
# Default direct exchange that we use to publish messages directly to this queue.
# This is a leftover from very early days and will be removed before version 1.0.
#
# @deprecated
def exchange
@exchange ||= Exchange.new(@channel, :direct, AMQ::Protocol::EMPTY_STRING, :key => name)
end
end # Queue
end # AMQP