lib/amqp/queue.rb in amqp-1.0.0.pre2 vs lib/amqp/queue.rb in amqp-1.0.0

- old
+ new

@@ -95,11 +95,11 @@ # RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}. # # # h2. Queue durability and persistence of messages. # - # Learn more in our {file:docs/Durability.textile Durability guide}. + # Learn more in our {http://rubyamqp.info/articles/durability/}. # # # h2. Message ordering # # RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues} @@ -110,15 +110,15 @@ # When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks # are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need. # Learn more in {file:docs/ErrorHandling.textile Error Handling guide}. # # - # @note Please make sure you read {file:docs/Durability.textile Durability guide} that covers exchanges durability vs. messages + # @note Please make sure you read {http://rubyamqp.info/articles/durability/} that covers exchanges durability vs. messages # persistence. # # - # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1) + # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1) # @see AMQP::Exchange class Queue < AMQ::Client::Queue # # API @@ -219,11 +219,15 @@ # Defines a callback that will be executed once queue is declared. More than one callback can be defined. # if queue is already declared, given callback is executed immediately. # # @api public def once_declared(&block) - @declaration_deferrable.callback(&block) + @declaration_deferrable.callback do + # guards against cases when deferred operations + # don't complete before the channel is closed + block.call if @channel.open? + end end # once_declared(&block) # @return [Boolean] true if this queue is server-named def server_named? @@ -279,18 +283,12 @@ # @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block. # # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) - if self.server_named? - @channel.once_open do - @declaration_deferrable.callback do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) - end - end - else - @channel.once_open do + @channel.once_open do + self.once_name_is_available do super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end end self @@ -343,11 +341,12 @@ # "in flight" messages to be received after this call completes. # Those messages will be serviced by the last block used in a # {Queue#subscribe} or {Queue#pop} call. # # @param [Exchange] Exchange to unbind from. - # + # @option opts [String] :routing_key Binding routing key + # @option opts [Hash] :arguments Binding arguments # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @@ -355,11 +354,13 @@ # # @api public # @see Queue#bind def unbind(exchange, opts = {}, &block) @channel.once_open do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) + self.once_name_is_available do + super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), opts[:arguments], &block) + end end end # This method deletes a queue. When a queue is deleted any pending @@ -389,11 +390,13 @@ # @api public # @see Queue#purge # @see Queue#unbind def delete(opts = {}, &block) @channel.once_open do - super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) + self.once_name_is_available do + super(opts.fetch(:if_unused, false), opts.fetch(:if_empty, false), opts.fetch(:nowait, false), &block) + end end # backwards compatibility nil end @@ -414,11 +417,13 @@ # @api public # @see Queue#delete # @see Queue#unbind def purge(opts = {}, &block) @channel.once_open do - super(opts.fetch(:nowait, false), &block) + self.once_declared do + super(opts.fetch(:nowait, false), &block) + end end # backwards compatibility nil end @@ -475,15 +480,21 @@ block.call(h, payload, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) end } @channel.once_open do - # see AMQ::Client::Queue#get in amq-client - self.get(!opts.fetch(:ack, false), &shim) + self.once_name_is_available do + # see AMQ::Client::Queue#get in amq-client + self.get(!opts.fetch(:ack, false), &shim) + end end else - @channel.once_open { self.get(!opts.fetch(:ack, false)) } + @channel.once_open do + self.once_name_is_available do + self.get(!opts.fetch(:ack, false)) + end + end end end # Subscribes to asynchronous message delivery. @@ -717,11 +728,13 @@ raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) @channel.once_open do - self.once_declared do + self.once_name_is_available do + # guards against a pathological case race condition when a channel + # is opened and closed before delayed operations are completed. self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) self.on_delivery(&block) end end @@ -783,11 +796,11 @@ # # # @api public def unsubscribe(opts = {}, &block) @channel.once_open do - self.once_declared do + self.once_name_is_available do if @default_consumer @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil end end end @@ -810,16 +823,18 @@ raise ArgumentError, "AMQP::Queue#status does not make any sense without a block" unless block shim = Proc.new { |q, declare_ok| block.call(declare_ok.message_count, declare_ok.consumer_count) } @channel.once_open do - # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic - # recovery process. MK. - @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) + self.once_name_is_available do + # we do not use self.declare here to avoid caching of @passive since that will cause unexpected side-effects during automatic + # recovery process. MK. + @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@channel.id, @name, true, @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments])) - self.append_callback(:declare, &shim) - @channel.queues_awaiting_declare_ok.push(self) + self.append_callback(:declare, &shim) + @channel.queues_awaiting_declare_ok.push(self) + end end self end @@ -879,9 +894,19 @@ protected # @private def self.add_default_options(name, opts, block) { :queue => name, :nowait => (block.nil? && !name.empty?) }.merge(opts) + end + + def once_name_is_available(&block) + if server_named? + self.once_declared do + block.call + end + else + block.call + end end private # Default direct exchange that we use to publish messages directly to this queue.