lib/amqp/queue.rb in amqp-0.9.10 vs lib/amqp/queue.rb in amqp-1.0.0.pre1

- old
+ new

@@ -114,11 +114,11 @@ # # @note Please make sure you read {file:docs/Durability.textile Durability guide} that covers exchanges durability vs. messages # persistence. # # - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1) + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1) # @see AMQP::Exchange class Queue < AMQ::Client::Queue # # API @@ -219,15 +219,11 @@ # Defines a callback that will be executed once queue is declared. More than one callback can be defined. # if queue is already declared, given callback is executed immediately. # # @api public def once_declared(&block) - @declaration_deferrable.callback do - # guards against cases when deferred operations - # don't complete before the channel is closed - block.call if @channel.open? - end + @declaration_deferrable.callback(&block) end # once_declared(&block) # @return [Boolean] true if this queue is server-named def server_named? @@ -283,12 +279,18 @@ # @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block. # # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) - @channel.once_open do - self.once_name_is_available do + if self.server_named? + @channel.once_open do + @declaration_deferrable.callback do + super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) + end + end + else + @channel.once_open do super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end end self @@ -353,13 +355,11 @@ # # @api public # @see Queue#bind def unbind(exchange, opts = {}, &block) @channel.once_open do - self.once_name_is_available do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) - end + super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) end end # This method deletes a queue. When a queue is deleted any pending @@ -389,13 +389,11 @@ # @api public # @see Queue#purge # @see Queue#unbind def delete(opts = {}, &block) @channel.once_open do - self.once_name_is_available do - super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) - end + super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) end # backwards compatibility nil end @@ -416,13 +414,11 @@ # @api public # @see Queue#delete # @see Queue#unbind def purge(opts = {}, &block) @channel.once_open do - self.once_declared do - super(opts.fetch(:nowait, false), &block) - end + super(opts.fetch(:nowait, false), &block) end # backwards compatibility nil end @@ -479,21 +475,15 @@ block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) end } @channel.once_open do - self.once_name_is_available do - # see AMQ::Client::Queue#get in amq-client - self.get(!opts.fetch(:ack, false), &shim) - end + # see AMQ::Client::Queue#get in amq-client + self.get(!opts.fetch(:ack, false), &shim) end else - @channel.once_open do - self.once_name_is_available do - self.get(!opts.fetch(:ack, false)) - end - end + @channel.once_open { self.get(!opts.fetch(:ack, false)) } end end # Subscribes to asynchronous message delivery. @@ -727,13 +717,11 @@ raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) @channel.once_open do - self.once_name_is_available do - # guards against a pathological case race condition when a channel - # is opened and closed before delayed operations are completed. + self.once_declared do self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) self.on_delivery(&block) end end @@ -795,11 +783,11 @@ # # # @api public def unsubscribe(opts = {}, &block) @channel.once_open do - self.once_name_is_available do + self.once_declared do if @default_consumer @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil end end end @@ -822,18 +810,16 @@ raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) } @channel.once_open do - self.once_name_is_available do - # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic - # recovery process. MK. - @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) + # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic + # recovery process. MK. + @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) - self.append_callback(:declare, &shim) - @channel.queues_awaiting_declare_ok.push(self) - end + self.append_callback(:declare, &shim) + @channel.queues_awaiting_declare_ok.push(self) end self end @@ -893,19 +879,9 @@ protected # @private def self.add_default_options(name, opts, block) { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) - end - - def once_name_is_available(&block) - if server_named? - self.once_declared do - block.call - end - else - block.call - end end private # Default direct exchange that we use to publish messages directly to this queue.