lib/amqp/consumer.rb in amqp-1.1.0.pre1 vs lib/amqp/consumer.rb in amqp-1.1.0.pre2
- old
+ new
@@ -1,21 +1,29 @@
# encoding: utf-8
-require "amq/client/async/consumer"
+require "amqp/consumer_tag_generator"
module AMQP
# AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker.
# Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue)
# or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin
# manner with respect to channel-level prefetch setting).
#
# @see AMQP::Queue
# @see AMQP::Queue#subscribe
# @see AMQP::Queue#cancel
- class Consumer < AMQ::Client::Async::Consumer
+ class Consumer
#
+ # Behaviours
+ #
+
+ include Callbacks
+ extend ProtocolMethodHandlers
+
+
+ #
# API
#
# @return [AMQP::Channel] Channel this consumer uses
attr_reader :channel
@@ -25,39 +33,57 @@
attr_reader :consumer_tag
# @return [Hash] Custom subscription metadata
attr_reader :arguments
- # @return [AMQ::Client::ConsumerTagGenerator] Consumer tag generator
+ # @return [AMQP::ConsumerTagGenerator] Consumer tag generator
def self.tag_generator
- @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
+ @tag_generator ||= AMQP::ConsumerTagGenerator.new
end # self.tag_generator
- # @param [AMQ::Client::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances
- # @return [AMQ::Client::ConsumerTagGenerator] Provided argument
+ # @param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances
+ # @return [AMQP::ConsumerTagGenerator] Provided argument
def self.tag_generator=(generator)
@tag_generator = generator
end
- def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false)
- super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local)
+ def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
+ @callbacks = Hash.new
+
+ @channel = channel || raise(ArgumentError, "channel is nil")
+ @connection = channel.connection || raise(ArgumentError, "connection is nil")
+ @queue = queue || raise(ArgumentError, "queue is nil")
+ @consumer_tag = consumer_tag || self.class.tag_generator.generate_for(queue)
+ @exclusive = exclusive
+ @no_ack = no_ack
+ @arguments = arguments
+
+ @no_local = no_local
+
+ self.register_with_channel
+ self.register_with_queue
end # initialize
# @return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
def exclusive?
- super
+ !!@exclusive
end # exclusive?
# Begin consuming messages from the queue
# @return [AMQP::Consumer] self
def consume(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
- super(nowait, &block)
+ @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))
+ self.redefine_callback(:consume, &block)
+
+ @channel.consumers_awaiting_consume_ok.push(self)
+
+ self
end
end
self
end # consume(nowait = false, &block)
@@ -70,22 +96,38 @@
@queue.once_declared do
self.unregister_with_channel
@consumer_tag = self.class.tag_generator.generate_for(@queue)
self.register_with_channel
- super(&block)
+ @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
+ self.redefine_callback(:consume, &block) if block
+
+ self
end
end
self
end # resubscribe(&block)
# @return [AMQP::Consumer] self
def cancel(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
- super(nowait, &block)
+ @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
+ self.clear_callbacks(:delivery)
+ self.clear_callbacks(:consume)
+ self.clear_callbacks(:scancel)
+
+ self.unregister_with_channel
+ self.unregister_with_queue
+
+ if !nowait
+ self.redefine_callback(:cancel, &block)
+ @channel.consumers_awaiting_cancel_ok.push(self)
+ end
+
+ self
end
end
self
end # cancel(nowait = false, &block)
@@ -127,82 +169,208 @@
h = Header.new(@channel, basic_deliver, headers.decode_payload)
block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
end
}
- super(&delivery_shim)
+ self.append_callback(:delivery, &delivery_shim)
+
+ self
end # on_delivery(&block)
+ # @return [String] Readable representation of relevant object state.
+ def inspect
+ "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
+ end # inspect
+
+
+ def on_cancel(&block)
+ self.append_callback(:scancel, &block)
+
+ self
+ end # on_cancel(&block)
+
+ def handle_cancel(basic_cancel)
+ self.exec_callback(:scancel, basic_cancel)
+ end # handle_cancel(basic_cancel)
+
+
+
# @group Acknowledging & Rejecting Messages
# Acknowledge a delivery tag.
# @return [Consumer] self
#
# @api public
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag)
- super(delivery_tag)
+ @channel.acknowledge(delivery_tag)
+
+ self
end # acknowledge(delivery_tag)
#
# @return [Consumer] self
#
# @api public
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
+ # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
- super(delivery_tag, requeue)
+ @channel.reject(delivery_tag, requeue)
+
+ self
end # reject(delivery_tag, requeue = true)
+ # Defines a callback that will be executed when AMQP connection is recovered after a network failure..
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ # Defines a callback that will be executed when AMQP connection is recovered after a network failure..
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_recovery(&block)
+ self.redefine_callback(:after_recovery, &block)
+ end # on_recovery(&block)
+ alias after_recovery on_recovery
+
# @endgroup
# @group Error Handling & Recovery
# Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_connection_interruption(&block)
- super(&block)
+ self.redefine_callback(:after_connection_interruption, &block)
end # on_connection_interruption(&block)
alias after_connection_interruption on_connection_interruption
+ # @private
+ def handle_connection_interruption(method = nil)
+ self.exec_callback_yielding_self(:after_connection_interruption)
+ end # handle_connection_interruption
+
# Defines a callback that will be executed after TCP connection is recovered after a network failure
# but before AMQP connection is re-opened.
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def before_recovery(&block)
- super(&block)
+ self.redefine_callback(:before_recovery, &block)
end # before_recovery(&block)
- # Defines a callback that will be executed when AMQP connection is recovered after a network failure..
- # Only one callback can be defined (the one defined last replaces previously added ones).
- #
- # @api public
- def on_recovery(&block)
- super(&block)
- end # on_recovery(&block)
- alias after_recovery on_recovery
+ # @private
+ def run_before_recovery_callbacks
+ self.exec_callback_yielding_self(:before_recovery)
+ end
+ # @private
+ def run_after_recovery_callbacks
+ self.exec_callback_yielding_self(:after_recovery)
+ end
+
+
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
- super
+ self.exec_callback_yielding_self(:before_recovery)
+ self.resubscribe
+ self.exec_callback_yielding_self(:after_recovery)
end # auto_recover
# @endgroup
- # @return [String] Readable representation of relevant object state.
- def inspect
- "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
- end # inspect
+ def to_s
+ "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
+ end
+ #
+ # Implementation
+ #
+
+ def handle_delivery(basic_deliver, metadata, payload)
+ self.exec_callback(:delivery, basic_deliver, metadata, payload)
+ end # handle_delivery(basic_deliver, metadata, payload)
+
+ def handle_consume_ok(consume_ok)
+ self.exec_callback_once(:consume, consume_ok)
+ end # handle_consume_ok(consume_ok)
+
+ def handle_cancel_ok(cancel_ok)
+ @consumer_tag = nil
+
+ # detach from object graph so that this object will be garbage-collected
+ @queue = nil
+ @channel = nil
+ @connection = nil
+
+ self.exec_callback_once(:cancel, cancel_ok)
+ end # handle_cancel_ok(method)
+
+
+
+ self.handle(AMQ::Protocol::Basic::ConsumeOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ consumer = channel.consumers_awaiting_consume_ok.shift
+
+ consumer.handle_consume_ok(frame.decode_payload)
+ end
+
+
+ self.handle(AMQ::Protocol::Basic::CancelOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ consumer = channel.consumers_awaiting_cancel_ok.shift
+
+ consumer.handle_cancel_ok(frame.decode_payload)
+ end
+
+
+ self.handle(AMQ::Protocol::Basic::Deliver) do |connection, method_frame, content_frames|
+ channel = connection.channels[method_frame.channel]
+ basic_deliver = method_frame.decode_payload
+ consumer = channel.consumers[basic_deliver.consumer_tag]
+
+ metadata = content_frames.shift
+ payload = content_frames.map { |frame| frame.payload }.join
+
+ # Handle the delivery only if the consumer still exists.
+ # The broker has been known to deliver a few messages after the consumer has been shut down.
+ consumer.handle_delivery(basic_deliver, metadata, payload) if consumer
+ end
+
+
+ protected
+
+ def register_with_channel
+ @channel.consumers[@consumer_tag] = self
+ end # register_with_channel
+
+ def register_with_queue
+ @queue.consumers[@consumer_tag] = self
+ end # register_with_queue
+
+ def unregister_with_channel
+ @channel.consumers.delete(@consumer_tag)
+ end # register_with_channel
+
+ def unregister_with_queue
+ @queue.consumers.delete(@consumer_tag)
+ end # register_with_queue
+
+ handle(AMQ::Protocol::Basic::Cancel) do |connection, method_frame|
+ channel = connection.channels[method_frame.channel]
+ basic_cancel = method_frame.decode_payload
+ consumer = channel.consumers[basic_cancel.consumer_tag]
+
+ # Handle the delivery only if the consumer still exists.
+ consumer.handle_cancel(basic_cancel) if consumer
+ end
end # Consumer
end # AMQP