lib/amqp/queue.rb in amqp-0.8.0.rc13 vs lib/amqp/queue.rb in amqp-0.8.0.rc14
- old
+ new
@@ -1,8 +1,9 @@
# encoding: utf-8
require "amq/client/queue"
+require "amqp/consumer"
module AMQP
# h2. What are AMQP queues?
#
# Queues store and forward messages to consumers. They are similar to mailboxes in SMTP.
@@ -171,19 +172,16 @@
# @api public
def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
@channel = channel
- name = AMQ::Protocol::EMPTY_STRING if name.nil?
@name = name unless name.empty?
@server_named = name.empty?
@opts = self.class.add_default_options(name, opts, block)
raise ArgumentError.new("server-named queues (name = '') declaration with :nowait => true makes no sense. If you are not sure what that means, simply drop :nowait => true from opts.") if @server_named && @opts[:nowait]
- @bindings = Hash.new
-
# a deferrable that we use to delay operations until this queue is actually declared.
# one reason for this is to support a case when a server-named queue is immediately bound.
# it's crazy, but 0.7.x supports it, so... MK.
@declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
@@ -216,10 +214,19 @@
self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback)
end
end
end
+ # Defines a callback that will be executed once queue is declared. More than one callback can be defined.
+ # if queue is already declared, given callback is executed immediately.
+ #
+ # @api public
+ def once_declared(&block)
+ @declaration_deferrable.callback(&block)
+ end # once_declared(&block)
+
+
# @return [Boolean] true if this queue is server-named
def server_named?
@server_named
end # server_named?
@@ -272,16 +279,10 @@
# @yield [] Since queue.bind-ok carries no attributes, no parameters are yielded to the block.
#
# @api public
# @see Queue#unbind
def bind(exchange, opts = {}, &block)
- @status = :unbound
- # amq-client's Queue already does exchange.respond_to?(:name) ? exchange.name : exchange
- # for us
- exchange = exchange
- @bindings[exchange] = opts
-
if self.server_named?
@channel.once_open do
@declaration_deferrable.callback do
super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block)
end
@@ -294,10 +295,48 @@
self
end
+ # @group Error Handling and Recovery
+
+ # Used by automatic recovery machinery.
+ # @private
+ # @api plugin
+ def rebind(&block)
+ @bindings.each { |b| self.bind(b[:exchange], b) }
+ end
+
+ # Called by associated connection object when AMQP connection has been re-established
+ # (for example, after a network failure).
+ #
+ # @api plugin
+ def auto_recover
+ self.exec_callback_yielding_self(:before_recovery)
+
+ if self.server_named?
+ old_name = @name.dup
+ @name = AMQ::Protocol::EMPTY_STRING
+
+ @channel.queues.delete(old_name)
+ end
+
+ self.redeclare do
+ @declaration_deferrable.succeed
+ self.rebind
+
+ @consumers.each { |tag, consumer| consumer.auto_recover }
+
+ self.exec_callback_yielding_self(:after_recovery)
+ end
+ end # auto_recover
+
+ # @endgroup
+
+
+
+
# Remove the binding between the queue and exchange. The queue will
# not receive any more messages until it is bound to another
# exchange.
#
# Due to the asynchronous nature of the protocol, it is possible for
@@ -388,49 +427,23 @@
# This method provides a direct access to the messages in a queue
# using a synchronous dialogue that is designed for specific types of
# application where synchronous functionality is more important than
# performance.
#
- # If provided block takes one argument, it is passed message payload every time {Queue#pop} is called.
+ # If queue is empty, `payload` callback argument will be nil, otherwise arguments
+ # are identical to those of {AMQP::Queue#subscribe} callback.
#
- # @example Use of callback with a single argument
+ # @example Fetching messages off AMQP queue on demand
#
- # EM.run do
- # exchange = AMQP::Channel.direct("foo queue")
- # EM.add_periodic_timer(1) do
- # exchange.publish("random number #{rand(1000)}")
- # end
+ # queue.pop do |metadata, payload|
+ # if payload
+ # puts "Fetched a message: #{payload.inspect}, content_type: #{metadata.content_type}. Shutting down..."
+ # else
+ # puts "No messages in the queue"
+ # end
+ # end
#
- # # note that #bind is never called; it is implicit because
- # # the exchange and queue names match
- # queue = AMQP::Channel.queue('foo queue')
- # queue.pop { |body| puts "received payload [#{body}]" }
- #
- # EM.add_periodic_timer(1) { queue.pop }
- # end
- #
- # If the block takes 2 parameters, both the header and the body will
- # be passed in for processing. The header object is defined by
- # AMQP::Protocol::Header.
- #
- # @example Use of callback with two arguments
- #
- # EM.run do
- # exchange = AMQP::Channel.direct("foo queue")
- # EM.add_periodic_timer(1) do
- # exchange.publish("random number #{rand(1000)}")
- # end
- #
- # queue = AMQP::Channel.queue('foo queue')
- # queue.pop do |header, body|
- # p header
- # puts "received payload [#{body}]"
- # end
- #
- # EM.add_periodic_timer(1) { queue.pop }
- # end
- #
# @option opts [Boolean] :ack (false) If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
# the cost of reliability. Messages can get lost if a client dies
@@ -477,10 +490,16 @@
#
# The provided block is passed a single message each time the
# exchange matches a message to this queue.
#
#
+ # Attempts to {Queue#subscribe} multiple times to the same exchange will raise an
+ # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead.
+ # {file:docs/Queues.textile Documentation guide on queues} explains this and other topics
+ # in great detail.
+ #
+ #
# @example Use of callback with a single argument
#
# EventMachine.run do
# exchange = AMQP::Channel.direct("foo queue")
# EM.add_periodic_timer(1) do
@@ -490,12 +509,11 @@
# queue = AMQP::Channel.queue('foo queue')
# queue.subscribe { |body| puts "received payload [#{body}]" }
# end
#
# If the block takes 2 parameters, both the header and the body will
- # be passed in for processing. The header object is defined by
- # AMQP::Protocol::Header.
+ # be passed in for processing.
#
# @example Use of callback with two arguments
#
# EventMachine.run do
# connection = AMQP.connect(:host => '127.0.0.1')
@@ -546,11 +564,125 @@
# :venue => "Stockholm"
# },
# :timestamp => Time.now.to_i)
# end
#
+ # @example Using object as consumer (message handler), take one
#
+ # class Consumer
+ #
+ # #
+ # # API
+ # #
+ #
+ # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING)
+ # @queue_name = queue_name
+ #
+ # @channel = channel
+ # # Consumer#handle_channel_exception will handle channel
+ # # exceptions. Keep in mind that you can only register one error handler,
+ # # so the last one registered "wins".
+ # @channel.on_error(&method(:handle_channel_exception))
+ # end # initialize
+ #
+ # def start
+ # @queue = @channel.queue(@queue_name, :exclusive => true)
+ # # #handle_message method will be handling messages routed to @queue
+ # @queue.subscribe(&method(:handle_message))
+ # end # start
+ #
+ #
+ #
+ # #
+ # # Implementation
+ # #
+ #
+ # def handle_message(metadata, payload)
+ # puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
+ # end # handle_message(metadata, payload)
+ #
+ # def handle_channel_exception(channel, channel_close)
+ # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
+ # end # handle_channel_exception(channel, channel_close)
+ # end
+ #
+ #
+ # @example Using object as consumer (message handler), take two: aggregatied handler
+ # class Consumer
+ #
+ # #
+ # # API
+ # #
+ #
+ # def handle_message(metadata, payload)
+ # puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
+ # end # handle_message(metadata, payload)
+ # end
+ #
+ #
+ # class Worker
+ #
+ # #
+ # # API
+ # #
+ #
+ #
+ # def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new)
+ # @queue_name = queue_name
+ #
+ # @channel = channel
+ # @channel.on_error(&method(:handle_channel_exception))
+ #
+ # @consumer = consumer
+ # end # initialize
+ #
+ # def start
+ # @queue = @channel.queue(@queue_name, :exclusive => true)
+ # @queue.subscribe(&@consumer.method(:handle_message))
+ # end # start
+ #
+ #
+ #
+ # #
+ # # Implementation
+ # #
+ #
+ # def handle_channel_exception(channel, channel_close)
+ # puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
+ # end # handle_channel_exception(channel, channel_close)
+ # end
+ #
+ # @example Unit-testing objects that are used as consumers, RSpec style
+ #
+ # require "ostruct"
+ # require "json"
+ #
+ # # RSpec example
+ # describe Consumer do
+ # describe "when a new message arrives" do
+ # subject { described_class.new }
+ #
+ # let(:metadata) do
+ # o = OpenStruct.new
+ #
+ # o.content_type = "application/json"
+ # o
+ # end
+ # let(:payload) { JSON.encode({ :command => "reload_config" }) }
+ #
+ # it "does some useful work" do
+ # # check preconditions here if necessary
+ #
+ # subject.handle_message(metadata, payload)
+ #
+ # # add your code expectations here
+ # end
+ # end
+ # end
+ #
+ #
+ #
# @option opts [Boolean ]:ack (false) If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
# the cost of reliability. Messages can get lost if a client dies
@@ -578,55 +710,65 @@
# @return [Queue] Self
# @api public
#
# @see file:docs/Queues.textile Documentation guide on queues
# @see #unsubscribe
+ # @see AMQP::Consumer
def subscribe(opts = {}, &block)
- raise Error, 'already subscribed to the queue' if @consumer_tag
+ raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer
- # having initial value for @consumer_tag makes a lot of obscure issues
- # go away. It is set to real value once we receive consume-ok (it is handled by
- # AMQ::Client::Queue we inherit from).
- @consumer_tag = "for now"
-
opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])
- # We have to maintain this multiple arities jazz
- # because older versions this gem are used in examples in at least 3
- # books published by O'Reilly :(. MK.
- delivery_shim = Proc.new { |method, headers, payload|
- case block.arity
- when 1 then
- block.call(payload)
- when 2 then
- h = Header.new(@channel, method, headers.decode_payload)
- block.call(h, payload)
- else
- h = Header.new(@channel, method, headers.decode_payload)
- block.call(h, payload, method.consumer_tag, method.delivery_tag, method.redelivered, method.exchange, method.routing_key)
- end
- }
-
@channel.once_open do
- @consumer_tag = nil
- # consumer_tag is set by AMQ::Client::Queue once we receive consume-ok, this takes a while.
- self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])
+ self.once_declared do
+ self.consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])
+
+ self.on_delivery(&block)
+ end
end
- self.on_delivery(&delivery_shim)
self
end
+ # @return [String] Consumer tag of the default consumer associated with this queue (if any), or nil
+ # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind.
+ # @see Queue#subscribe
+ # @see AMQP::Consumer
+ # @api public
+ def consumer_tag
+ if @default_consumer
+ @default_consumer.consumer_tag
+ else
+ nil
+ end
+ end # consumer_tag
- # Removes the subscription from the queue and cancels the consumer.
- # New messages will not be received by this queue instance.
- #
- # Due to the asynchronous nature of the protocol, it is possible for
- # "in flight" messages to be received after this call completes.
+ # @return [AMQP::Consumer] Default consumer associated with this queue (if any), or nil
+ # @note Default consumer is the one registered with the convenience {AMQP::Queue#subscribe} method. It has no special properties of any kind.
+ # @see Queue#subscribe
+ # @see AMQP::Consumer
+ # @api public
+ def default_consumer
+ @default_consumer
+ end
+
+
+ # @return [Class]
+ # @private
+ def self.consumer_class
+ AMQP::Consumer
+ end # self.consumer_class
+
+
+ # Removes the subscription from the queue and cancels the consumer. Once consumer is cancelled,
+ # messages will no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for
+ # “in flight” messages to be received after this call completes.
# Those messages will be serviced by the last block used in a
# {Queue#subscribe} or {Queue#pop} call.
#
+ # Fetching messages with {AMQP::Queue#pop} is still possible even after consumer is cancelled.
+ #
# Additionally, if the queue was created with _autodelete_ set to
# true, the server will delete the queue after its wait period
# has expired unless the queue is bound to an active exchange.
#
# The method accepts a block which will be executed when the
@@ -640,13 +782,17 @@
# @yieldparam [AMQP::Protocol::Basic::CancelOk] cancel_ok AMQP method basic.cancel-ok. You can obtain consumer tag from it.
#
#
# @api public
def unsubscribe(opts = {}, &block)
- # @consumer_tag is nillified for us by AMQ::Client::Queue, that is,
- # our superclass. MK.
- @channel.once_open { self.cancel(opts.fetch(:nowait, true), &block) }
+ @channel.once_open do
+ self.once_declared do
+ if @default_consumer
+ @default_consumer.cancel(opts.fetch(:nowait, true), &block); @default_consumer = nil
+ end
+ end
+ end
end
# Get the number of messages and active consumers (with active channel flow) on a queue.
#
# @example Getting number of messages and active consumers for a queue
@@ -668,29 +814,31 @@
@channel.once_open { self.declare(true, @durable, @exclusive, @auto_delete, false, nil, &shim) }
end
# Boolean check to see if the current queue has already subscribed
- # to messages delivery.
+ # to messages delivery (has default consumer).
#
# Attempts to {Queue#subscribe} multiple times to the same exchange will raise an
- # Exception. Only a single block at a time can be associated with any
- # queue instance for processing incoming messages.
+ # Exception. If you need more than one consumer per queue, use {AMQP::Consumer} instead.
#
# @return [Boolean] true if there is a consumer tag associated with this Queue instance
# @api public
+ # @deprecated
def subscribed?
- !!@consumer_tag
+ @default_consumer && @default_consumer.subscribed?
end
# Compatibility alias for #on_declare.
#
# @api public
# @deprecated
def callback
- @on_declare
+ return nil if !subscribed?
+
+ @default_consumer.callback
end
@@ -708,9 +856,17 @@
# @api plugin
def reset
initialize(@channel, @name, @opts)
end
+
+ # @private
+ # @api plugin
+ def handle_connection_interruption(method = nil)
+ super(method)
+
+ @declaration_deferrable = EventMachine::DefaultDeferrable.new
+ end
protected
# @private
def self.add_default_options(name, opts, block)