lib/amqp/queue.rb in amqp-0.9.10 vs lib/amqp/queue.rb in amqp-1.0.0.pre1
- old
+ new
@@ -114,11 +114,11 @@
#
# @note Please make sure you read {file:docs/Durability.textile Durability guide} that covers exchanges durability vs. messages
# persistence.
#
#
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1)
+ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1)
# @see AMQP::Exchange
class Queue < AMQ::Client::Queue
#
# API
@@ -219,15 +219,11 @@
# Defines a callback that will be executed once queue is declared. More than one callback can be defined.
# if queue is already declared, given callback is executed immediately.
#
# @api public
def once_declared(&block)
- @declaration_deferrable.callback do
- # guards against cases when deferred operations
- # don't complete before the channel is closed
- block.call if @channel.open?
- end
+ @declaration_deferrable.callback(&block)
end # once_declared(&block)
# @return [Boolean] true if this queue is server-named
def server_named?
@@ -283,12 +279,18 @@
# @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block.
#
# @api public
# @see Queue#unbind
def bind(exchange, opts = {}, &block)
- @channel.once_open do
- self.once_name_is_available do
+ if self.server_named?
+ @channel.once_open do
+ @declaration_deferrable.callback do
+ super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
+ end
+ end
+ else
+ @channel.once_open do
super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
end
end
self
@@ -353,13 +355,11 @@
#
# @api public
# @see Queue#bind
def unbind(exchange, opts = {}, &block)
@channel.once_open do
- self.once_name_is_available do
- super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block)
- end
+ super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block)
end
end
# This method deletes a queue. When a queue is deleted any pending
@@ -389,13 +389,11 @@
# @api public
# @see Queue#purge
# @see Queue#unbind
def delete(opts = {}, &block)
@channel.once_open do
- self.once_name_is_available do
- super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block)
- end
+ super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block)
end
# backwards compatibility
nil
end
@@ -416,13 +414,11 @@
# @api public
# @see Queue#delete
# @see Queue#unbind
def purge(opts = {}, &block)
@channel.once_open do
- self.once_declared do
- super(opts.fetch(:nowait, false), &block)
- end
+ super(opts.fetch(:nowait, false), &block)
end
# backwards compatibility
nil
end
@@ -479,21 +475,15 @@
block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key)
end
}
@channel.once_open do
- self.once_name_is_available do
- # see AMQ::Client::Queue#get in amq-client
- self.get(!opts.fetch(:ack, false), &shim)
- end
+ # see AMQ::Client::Queue#get in amq-client
+ self.get(!opts.fetch(:ack, false), &shim)
end
else
- @channel.once_open do
- self.once_name_is_available do
- self.get(!opts.fetch(:ack, false))
- end
- end
+ @channel.once_open { self.get(!opts.fetch(:ack, false)) }
end
end
# Subscribes to asynchronous message delivery.
@@ -727,13 +717,11 @@
raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer
opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])
@channel.once_open do
- self.once_name_is_available do
- # guards against a pathological case race condition when a channel
- # is opened and closed before delayed operations are completed.
+ self.once_declared do
self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])
self.on_delivery(&block)
end
end
@@ -795,11 +783,11 @@
#
#
# @api public
def unsubscribe(opts = {}, &block)
@channel.once_open do
- self.once_name_is_available do
+ self.once_declared do
if @default_consumer
@default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil
end
end
end
@@ -822,18 +810,16 @@
raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block
shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) }
@channel.once_open do
- self.once_name_is_available do
- # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic
- # recovery process. MK.
- @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]))
+ # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic
+ # recovery process. MK.
+ @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments]))
- self.append_callback(:declare, &shim)
- @channel.queues_awaiting_declare_ok.push(self)
- end
+ self.append_callback(:declare, &shim)
+ @channel.queues_awaiting_declare_ok.push(self)
end
self
end
@@ -893,19 +879,9 @@
protected
# @private
def self.add_default_options(name, opts, block)
{ :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts)
- end
-
- def once_name_is_available(&block)
- if server_named?
- self.once_declared do
- block.call
- end
- else
- block.call
- end
end
private
# Default direct exchange that we use to publish messages directly to this queue.