lib/amqp/queue.rb in amqp-1.0.0.pre2 vs lib/amqp/queue.rb in amqp-1.0.0
- old
+ new
@@ -95,11 +95,11 @@
# RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}.
#
#
# h2. Queue durability and persistence of messages.
#
- # Learn more in our {file:docs/Durability.textile Durability guide}.
+ # Learn more in our {http://rubyamqp.info/articles/durability/}.
#
#
# h2. Message ordering
#
# RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues}
@@ -110,15 +110,15 @@
# When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks
# are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need.
# Learn more in {file:docs/ErrorHandling.textile Error Handling guide}.
#
#
- # @note Please make sure you read {file:docs/Durability.textile Durability guide} that covers exchanges durability vs. messages
+ # @note Please make sure you read {http://rubyamqp.info/articles/durability/} that covers exchanges durability vs. messages
# persistence.
#
#
- # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1)
+ # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1)
# @see AMQP::Exchange
class Queue < AMQ::Client::Queue
#
# API
@@ -219,11 +219,15 @@
# Defines a callback that will be executed once queue is declared. More than one callback can be defined.
# if queue is already declared, given callback is executed immediately.
#
# @api public
def once_declared(&block)
- @declaration_deferrable.callback(&block)
+ @declaration_deferrable.callback do
+ # guards against cases when deferred operations
+ # don't complete before the channel is closed
+ block.call if @channel.open?
+ end
end # once_declared(&block)
# @return [Boolean] true if this queue is server-named
def server_named?
@@ -279,18 +283,12 @@
# @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block.
#
# @api public
# @see Queue#unbind
def bind(exchange, opts = {}, &block)
- if self.server_named?
- @channel.once_open do
- @declaration_deferrable.callback do
- super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
- end
- end
- else
- @channel.once_open do
+ @channel.once_open do
+ self.once_name_is_available do
super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
end
end
self
@@ -343,11 +341,12 @@
# "in flight" messages to be received after this call completes.
# Those messages will be serviced by the last block used in a
# {Queue#subscribe} or {Queue#pop} call.
#
# @param [Exchange] Exchange to unbind from.
- #
+ # @option opts [String] :routing_key Binding routing key
+ # @option opts [Hash] :arguments Binding arguments
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
@@ -355,11 +354,13 @@
#
# @api public
# @see Queue#bind
def unbind(exchange, opts = {}, &block)
@channel.once_open do
- super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block)
+ self.once_name_is_available do
+ super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block)
+ end
end
end
# This method deletes a queue. When a queue is deleted any pending
@@ -389,11 +390,13 @@
# @api public
# @see Queue#purge
# @see Queue#unbind
def delete(opts = {}, &block)
@channel.once_open do
- super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block)
+ self.once_name_is_available do
+ super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block)
+ end
end
# backwards compatibility
nil
end
@@ -414,11 +417,13 @@
# @api public
# @see Queue#delete
# @see Queue#unbind
def purge(opts = {}, &block)
@channel.once_open do
- super(opts.fetch(:nowait, false), &block)
+ self.once_declared do
+ super(opts.fetch(:nowait, false), &block)
+ end
end
# backwards compatibility
nil
end
@@ -475,15 +480,21 @@
block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key)
end
}
@channel.once_open do
- # see AMQ::Client::Queue#get in amq-client
- self.get(!opts.fetch(:ack, false), &shim)
+ self.once_name_is_available do
+ # see AMQ::Client::Queue#get in amq-client
+ self.get(!opts.fetch(:ack, false), &shim)
+ end
end
else
- @channel.once_open { self.get(!opts.fetch(:ack, false)) }
+ @channel.once_open do
+ self.once_name_is_available do
+ self.get(!opts.fetch(:ack, false))
+ end
+ end
end
end
# Subscribes to asynchronous message delivery.
@@ -717,11 +728,13 @@
raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer
opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])
@channel.once_open do
- self.once_declared do
+ self.once_name_is_available do
+ # guards against a pathological case race condition when a channel
+ # is opened and closed before delayed operations are completed.
self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])
self.on_delivery(&block)
end
end
@@ -783,11 +796,11 @@
#
#
# @api public
def unsubscribe(opts = {}, &block)
@channel.once_open do
- self.once_declared do
+ self.once_name_is_available do
if @default_consumer
@default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil
end
end
end
@@ -810,16 +823,18 @@
raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block
shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) }
@channel.once_open do
- # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic
- # recovery process. MK.
- @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]))
+ self.once_name_is_available do
+ # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic
+ # recovery process. MK.
+ @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]))
- self.append_callback(:declare, &shim)
- @channel.queues_awaiting_declare_ok.push(self)
+ self.append_callback(:declare, &shim)
+ @channel.queues_awaiting_declare_ok.push(self)
+ end
end
self
end
@@ -879,9 +894,19 @@
protected
# @private
def self.add_default_options(name, opts, block)
{ :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts)
+ end
+
+ def once_name_is_available(&block)
+ if server_named?
+ self.once_declared do
+ block.call
+ end
+ else
+ block.call
+ end
end
private
# Default direct exchange that we use to publish messages directly to this queue.