lib/amqp/queue.rb in amqp-0.8.0.rc2 vs lib/amqp/queue.rb in amqp-0.8.0.rc3
- old
+ new
@@ -8,31 +8,87 @@
# Queues store and forward messages to consumers. They similar to mailboxes in SMTP.
# Messages flow from producing applications to {Exchange exchanges} that route them
# to queues and finally queues deliver them to consumer applications (or consumer
# applications fetch messages as needed).
#
+ # Note that unlike some other messaging protocols/systems, messages are not delivered directly
+ # to queues. They are delivered to exchanges that route messages to queues using rules
+ # knows as *bindings*.
#
+ #
# h2. Concept of bindings
#
+ # Binding is an association between a queue and an exchange.
# Queues must be bound to at least one exchange in order to receive messages from publishers.
# Learn more about bindings in {Exchange Exchange class documentation}.
#
#
+ # h2. Key methods
+ #
+ # Key methods of Queue class are
+ #
+ # * {Queue#bind}
+ # * {Queue#subscribe}
+ # * {Queue#pop}
+ # * {Queue#delete}
+ # * {Queue#purge}
+ # * {Queue#unbind}
+ #
+ #
# h2. Queue names. Server-named queues. Predefined queues.
#
- # Like an Exchange, queue names starting with 'amq.' are reserved for
- # internal use. Attempts to create queue names in violation of this
- # reservation will raise AMQP::Error (ACCESS_REFUSED).
+ # Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI
+ # path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below).
+ # Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument.
#
- # When a queue is created without a name, the server will generate a
- # unique name internally (not currently supported in this library).
+ # Here is an example:
#
+ # @example Declaring a server-named queue using AMQP::Queue constructor
+ # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
+ # AMQP::Channel.new do |channel, open_ok|
+ # AMQP::Queue.new(channel, "", :auto_delete => true) do |queue, declare_ok|
+ # puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"
+ #
+ # connection.close {
+ # EM.stop { exit }
+ # }
+ # end
+ # end
+ # end
+ #
+ # <script src="https://gist.github.com/939596.js?file=gistfile1.rb"></script>
+ #
+ # If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor:
+ #
+ # @example Declaring a server-named queue using AMQP::Queue constructor
+ # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
+ # AMQP::Channel.new do |channel, open_ok|
+ # AMQP::Queue.new(channel, "images.resize", :auto_delete => true) do |queue, declare_ok|
+ # puts "#{queue.name} is ready to go."
+ #
+ # connection.close {
+ # EM.stop { exit }
+ # }
+ # end
+ # end
+ # end
+ #
+ # <script src="https://gist.github.com/939600.js?file=gistfile1.rb"></script>
+ #
+ # Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this
+ # rule will result in AMQP::IncompatibleOptionsError to be thrown (when
+ # queue is re-declared on the same channel object) or channel-level exception (when originally queue
+ # was declared on one channel and re-declaration with different attributes happens on another channel).
+ # Learn more in {file:docs/Queues.textile Queues guide} and {file:docs/ErrorHandling.textile Error Handling guide}.
+ #
+ #
+ #
# h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't.
#
# To quote AMQP 0.9.1 spec, there are two common message queue life-cycles:
#
- # * Durable message queues that are shared by many consumers and have an independent existence: i.e. they
+ # * Durable message queues that are shared by many consumers and have an independent existence: i.e. they
# will continue to exist and collect messages whether or not there are consumers to receive them.
# * Temporary message queues that are private to one consumer and are tied to that consumer. When the
# consumer disconnects, the message queue is deleted.
#
# There are some variations on these, such as shared message queues that are deleted when the last of
@@ -76,30 +132,25 @@
# are not lost. Simply publishing message to a durable exchange or the fact that queue(s) they are routed to
# is durable doesn't make messages persistent: it all depends on persistence mode of the messages itself.
# Publishing messages as persistent affects performance (just like with data stores, durability comes at a certain cost
# in performance and vise versa). Pass :persistent => true to {Exchange#publish} to publish your message as persistent.
#
- # Note that *only durable queues can be bound to durable exchanges*.
+ # Note that *only durable queues can be bound to durable exchanges*. Learn more in our {file:docs/Durability.textile Durability guide}.
#
#
# h2. Message ordering
#
# RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues}
#
#
- # h2. Key methods
+ # h2. Error handling
#
- # Key methods of Queue class are
+ # When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks
+ # are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need.
+ # Learn more in {file:docs/ErrorHandling.textile Error Handling guide}.
#
- # * {Queue#bind}
- # * {Queue#subscribe}
- # * {Queue#pop}
- # * {Queue#delete}
- # * {Queue#purge}
- # * {Queue#unbind}
#
- #
# @note Please make sure you read a section on queue durability vs. messages
# persistence.
#
#
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1)
@@ -110,15 +161,12 @@
# API
#
# Name of this queue
attr_reader :name
- attr_reader :sync_bind
# Options this queue object was instantiated with
attr_accessor :opts
- attr_accessor :on_declare
- attr_accessor :on_bind
# @option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not
# already exist. The client can use this to check whether the queue
@@ -161,23 +209,31 @@
# @api public
def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
@channel = channel
name = AMQ::Protocol::EMPTY_STRING if name.nil?
@name = name unless name.empty?
- @opts = self.class.add_default_options(name, opts, block)
- @bindings = Hash.new
+ @server_named = name.empty?
+ @opts = self.class.add_default_options(name, opts, block)
+ @bindings = Hash.new
+ # a deferrable that we use to delay operations until this queue is actually declared.
+ # one reason for this is to support a case when a server-named queue is immediately bound.
+ # it's crazy, but 0.7.x supports it, so... MK.
+ @declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+
if @opts[:nowait]
@status = :opened
block.call(self) if block
else
@status = :opening
end
super(channel.connection, channel, name)
shim = Proc.new do |q, declare_ok|
+ @declaration_deferrable.succeed
+
case block.arity
when 1 then block.call(q)
else
block.call(q, declare_ok)
end
@@ -185,16 +241,24 @@
@channel.once_open do
if block
self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim)
else
- self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments])
+ injected_callback = Proc.new { @declaration_deferrable.succeed }
+ # we cannot pass :nowait as true here, AMQ::Client::Queue will (rightfully) raise an exception because
+ # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK.
+ self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback)
end
end
end
+ # @return [Boolean] true if this queue is server-named
+ def server_named?
+ @server_named
+ end # server_named?
+
# This method binds a queue to an exchange. Until a queue is
# bound it will not receive any messages. In a classic messaging
# model, store-and-forward queues are bound to a dest exchange
# and subscription queues are bound to a dest_wild exchange.
#
@@ -232,18 +296,25 @@
#
# @api public
# @see Queue#unbind
def bind(exchange, opts = {}, &block)
@status = :unbound
- @sync_bind = !opts[:nowait]
# amq-client's Queue already does exchange.respond_to?(:name) ? exchange.name : exchange
# for us
exchange = exchange
@bindings[exchange] = opts
- @channel.once_open do
- super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
+ if self.server_named?
+ @channel.once_open do
+ @declaration_deferrable.callback do
+ super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
+ end
+ end
+ else
+ @channel.once_open do
+ super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
+ end
end
self
end
@@ -595,17 +666,9 @@
#
# @api public
# @deprecated
def callback
@on_declare
- end
-
- # Compatibility alias for #on_bind.
- #
- # @api public
- # @deprecated
- def bind_callback
- @on_bind
end