lib/amqp/queue.rb in amqp-0.8.0.rc13 vs lib/amqp/queue.rb in amqp-0.8.0.rc14

- old
+ new

@@ -1,8 +1,9 @@ # encoding: utf-8 require "amq/client/queue" +require "amqp/consumer" module AMQP # h2. What are AMQP queues? # # Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. @@ -171,19 +172,16 @@ # @api public def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? @channel = channel - name = AMQ::Protocol::EMPTY_STRING if name.nil? @name = name unless name.empty? @server_named = name.empty? @opts = self.class.add_default_options(name, opts, block) raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait] - @bindings = Hash.new - # a deferrable that we use to delay operations until this queue is actually declared. # one reason for this is to support a case when a server-named queue is immediately bound. # it's crazy, but 0.7.x supports it, so... MK. @declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new @@ -216,10 +214,19 @@ self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback) end end end + # Defines a callback that will be executed once queue is declared. More than one callback can be defined. + # if queue is already declared, given callback is executed immediately. + # + # @api public + def once_declared(&block) + @declaration_deferrable.callback(&block) + end # once_declared(&block) + + # @return [Boolean] true if this queue is server-named def server_named? @server_named end # server_named? @@ -272,16 +279,10 @@ # @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block. # # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) - @status = :unbound - # amq-client's Queue already does exchange.respond_to?(:name) ? exchange.name : exchange - # for us - exchange = exchange - @bindings[exchange] = opts - if self.server_named? @channel.once_open do @declaration_deferrable.callback do super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) end @@ -294,10 +295,48 @@ self end + # @group Error Handling and Recovery + + # Used by automatic recovery machinery. + # @private + # @api plugin + def rebind(&block) + @bindings.each { |b| self.bind(b[:exchange], b) } + end + + # Called by associated connection object when AMQP connection has been re-established + # (for example, after a network failure). + # + # @api plugin + def auto_recover + self.exec_callback_yielding_self(:before_recovery) + + if self.server_named? + old_name = @name.dup + @name = AMQ::Protocol::EMPTY_STRING + + @channel.queues.delete(old_name) + end + + self.redeclare do + @declaration_deferrable.succeed + self.rebind + + @consumers.each { |tag, consumer| consumer.auto_recover } + + self.exec_callback_yielding_self(:after_recovery) + end + end # auto_recover + + # @endgroup + + + + # Remove the binding between the queue and exchange. The queue will # not receive any more messages until it is bound to another # exchange. # # Due to the asynchronous nature of the protocol, it is possible for @@ -388,49 +427,23 @@ # This method provides a direct access to the messages in a queue # using a synchronous dialogue that is designed for specific types of # application where synchronous functionality is more important than # performance. # - # If provided block takes one argument, it is passed message payload every time {Queue#pop} is called. + # If queue is empty, `payload` callback argument will be nil, otherwise arguments + # are identical to those of {AMQP::Queue#subscribe} callback. # - # @example Use of callback with a single argument + # @example Fetching messages off AMQP queue on demand # - # EM.run do - # exchange = AMQP::Channel.direct("foo queue") - # EM.add_periodic_timer(1) do - # exchange.publish("random number #{rand(1000)}") - # end + # queue.pop do |metadata, payload| + # if payload + # puts "Fetched a message: #{payload.inspect}, content_type: #{metadata.content_type}. Shutting down..." + # else + # puts "No messages in the queue" + # end + # end # - # # note that #bind is never called; it is implicit because - # # the exchange and queue names match - # queue = AMQP::Channel.queue('foo queue') - # queue.pop { |body| puts "received payload [#{body}]" } - # - # EM.add_periodic_timer(1) { queue.pop } - # end - # - # If the block takes 2 parameters, both the header and the body will - # be passed in for processing. The header object is defined by - # AMQP::Protocol::Header. - # - # @example Use of callback with two arguments - # - # EM.run do - # exchange = AMQP::Channel.direct("foo queue") - # EM.add_periodic_timer(1) do - # exchange.publish("random number #{rand(1000)}") - # end - # - # queue = AMQP::Channel.queue('foo queue') - # queue.pop do |header, body| - # p header - # puts "received payload [#{body}]" - # end - # - # EM.add_periodic_timer(1) { queue.pop } - # end - # # @option opts [Boolean] :ack (false) If this field is set to false the server does not expect acknowledgments # for messages. That is, when a message is delivered to the client # the server automatically and silently acknowledges it on behalf # of the client. This functionality increases performance but at # the cost of reliability. Messages can get lost if a client dies @@ -477,10 +490,16 @@ # # The provided block is passed a single message each time the # exchange matches a message to this queue. # # + # Attempts to {Queue#subscribe} multiple times to the same exchange will raise an + # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead. + # {file:docs/Queues.textile Documentation guide on queues} explains this and other topics + # in great detail. + # + # # @example Use of callback with a single argument # # EventMachine.run do # exchange = AMQP::Channel.direct("foo queue") # EM.add_periodic_timer(1) do @@ -490,12 +509,11 @@ # queue = AMQP::Channel.queue('foo queue') # queue.subscribe { |body| puts "received payload [#{body}]" } # end # # If the block takes 2 parameters, both the header and the body will - # be passed in for processing. The header object is defined by - # AMQP::Protocol::Header. + # be passed in for processing. # # @example Use of callback with two arguments # # EventMachine.run do # connection = AMQP.connect(:host => '127.0.0.1') @@ -546,11 +564,125 @@ # :venue => "Stockholm" # }, # :timestamp => Time.now.to_i) # end # + # @example Using object as consumer (message handler), take one # + # class Consumer + # + # # + # # API + # # + # + # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING) + # @queue_name = queue_name + # + # @channel = channel + # # Consumer#handle_channel_exception will handle channel + # # exceptions. Keep in mind that you can only register one error handler, + # # so the last one registered "wins". + # @channel.on_error(&method(:handle_channel_exception)) + # end # initialize + # + # def start + # @queue = @channel.queue(@queue_name, :exclusive => true) + # # #handle_message method will be handling messages routed to @queue + # @queue.subscribe(&method(:handle_message)) + # end # start + # + # + # + # # + # # Implementation + # # + # + # def handle_message(metadata, payload) + # puts "Received a message: #{payload}, content_type = #{metadata.content_type}" + # end # handle_message(metadata, payload) + # + # def handle_channel_exception(channel, channel_close) + # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" + # end # handle_channel_exception(channel, channel_close) + # end + # + # + # @example Using object as consumer (message handler), take two: aggregatied handler + # class Consumer + # + # # + # # API + # # + # + # def handle_message(metadata, payload) + # puts "Received a message: #{payload}, content_type = #{metadata.content_type}" + # end # handle_message(metadata, payload) + # end + # + # + # class Worker + # + # # + # # API + # # + # + # + # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new) + # @queue_name = queue_name + # + # @channel = channel + # @channel.on_error(&method(:handle_channel_exception)) + # + # @consumer = consumer + # end # initialize + # + # def start + # @queue = @channel.queue(@queue_name, :exclusive => true) + # @queue.subscribe(&@consumer.method(:handle_message)) + # end # start + # + # + # + # # + # # Implementation + # # + # + # def handle_channel_exception(channel, channel_close) + # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" + # end # handle_channel_exception(channel, channel_close) + # end + # + # @example Unit-testing objects that are used as consumers, RSpec style + # + # require "ostruct" + # require "json" + # + # # RSpec example + # describe Consumer do + # describe "when a new message arrives" do + # subject { described_class.new } + # + # let(:metadata) do + # o = OpenStruct.new + # + # o.content_type = "application/json" + # o + # end + # let(:payload) { JSON.encode({ :command => "reload_config" }) } + # + # it "does some useful work" do + # # check preconditions here if necessary + # + # subject.handle_message(metadata, payload) + # + # # add your code expectations here + # end + # end + # end + # + # + # # @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments # for messages. That is, when a message is delivered to the client # the server automatically and silently acknowledges it on behalf # of the client. This functionality increases performance but at # the cost of reliability. Messages can get lost if a client dies @@ -578,55 +710,65 @@ # @return [Queue] Self # @api public # # @see file:docs/Queues.textile Documentation guide on queues # @see #unsubscribe + # @see AMQP::Consumer def subscribe(opts = {}, &block) - raise Error, 'already subscribed to the queue' if @consumer_tag + raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer - # having initial value for @consumer_tag makes a lot of obscure issues - # go away. It is set to real value once we receive consume-ok (it is handled by - # AMQ::Client::Queue we inherit from). - @consumer_tag = "for now" - opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) - # We have to maintain this multiple arities jazz - # because older versions this gem are used in examples in at least 3 - # books published by O'Reilly :(. MK. - delivery_shim = Proc.new { |method, headers, payload| - case block.arity - when 1 then - block.call(payload) - when 2 then - h = Header.new(@channel, method, headers.decode_payload) - block.call(h, payload) - else - h = Header.new(@channel, method, headers.decode_payload) - block.call(h, payload, method.consumer_tag, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) - end - } - @channel.once_open do - @consumer_tag = nil - # consumer_tag is set by AMQ::Client::Queue once we receive consume-ok, this takes a while. - self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) + self.once_declared do + self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm]) + + self.on_delivery(&block) + end end - self.on_delivery(&delivery_shim) self end + # @return [String] Consumer tag of the default consumer associated with this queue (if any), or nil + # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. + # @see Queue#subscribe + # @see AMQP::Consumer + # @api public + def consumer_tag + if @default_consumer + @default_consumer.consumer_tag + else + nil + end + end # consumer_tag - # Removes the subscription from the queue and cancels the consumer. - # New messages will not be received by this queue instance. - # - # Due to the asynchronous nature of the protocol, it is possible for - # "in flight" messages to be received after this call completes. + # @return [AMQP::Consumer] Default consumer associated with this queue (if any), or nil + # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind. + # @see Queue#subscribe + # @see AMQP::Consumer + # @api public + def default_consumer + @default_consumer + end + + + # @return [Class] + # @private + def self.consumer_class + AMQP::Consumer + end # self.consumer_class + + + # Removes the subscription from the queue and cancels the consumer. Once consumer is cancelled, + # messages will no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for + # “in flight” messages to be received after this call completes. # Those messages will be serviced by the last block used in a # {Queue#subscribe} or {Queue#pop} call. # + # Fetching messages with {AMQP::Queue#pop} is still possible even after consumer is cancelled. + # # Additionally, if the queue was created with _autodelete_ set to # true, the server will delete the queue after its wait period # has expired unless the queue is bound to an active exchange. # # The method accepts a block which will be executed when the @@ -640,13 +782,17 @@ # @yieldparam [AMQP::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it. # # # @api public def unsubscribe(opts = {}, &block) - # @consumer_tag is nillified for us by AMQ::Client::Queue, that is, - # our superclass. MK. - @channel.once_open { self.cancel(opts.fetch(:nowait, true), &block) } + @channel.once_open do + self.once_declared do + if @default_consumer + @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil + end + end + end end # Get the number of messages and active consumers (with active channel flow) on a queue. # # @example Getting number of messages and active consumers for a queue @@ -668,29 +814,31 @@ @channel.once_open { self.declare(true, @durable, @exclusive, @auto_delete, false, nil, &shim) } end # Boolean check to see if the current queue has already subscribed - # to messages delivery. + # to messages delivery (has default consumer). # # Attempts to {Queue#subscribe} multiple times to the same exchange will raise an - # Exception. Only a single block at a time can be associated with any - # queue instance for processing incoming messages. + # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead. # # @return [Boolean] true if there is a consumer tag associated with this Queue instance # @api public + # @deprecated def subscribed? - !!@consumer_tag + @default_consumer && @default_consumer.subscribed? end # Compatibility alias for #on_declare. # # @api public # @deprecated def callback - @on_declare + return nil if !subscribed? + + @default_consumer.callback end @@ -708,9 +856,17 @@ # @api plugin def reset initialize(@channel, @name, @opts) end + + # @private + # @api plugin + def handle_connection_interruption(method = nil) + super(method) + + @declaration_deferrable = EventMachine::DefaultDeferrable.new + end protected # @private def self.add_default_options(name, opts, block)