lib/message_queue/adapters/bunny/consumer.rb in message_queue-0.0.4 vs lib/message_queue/adapters/bunny/consumer.rb in message_queue-0.1.0
- old
+ new
@@ -1,7 +1,6 @@
-class MessageQueue::Adapters::Bunny::Connection::Consumer
- attr_reader :connection
+class MessageQueue::Adapters::Bunny::Connection::Consumer < MessageQueue::Consumer
attr_reader :queue_options, :queue_name
attr_reader :exchange_options, :exchange_name, :exchange_routing_key
attr_reader :subscribe_options
# Public: Initialize a new Bunny consumer.
@@ -23,39 +22,50 @@
# and
# https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb.
#
# Returns a Consumer.
def initialize(connection, options = {})
- @connection = connection
+ super
- options = options.dup
-
- @queue_options = options.fetch(:queue)
+ @queue_options = self.options.fetch(:queue)
@queue_name = queue_options.delete(:name) || (raise "Missing queue name")
- @exchange_options = options.fetch(:exchange)
+ @exchange_options = self.options.fetch(:exchange)
@exchange_name = exchange_options.delete(:name) || (raise "Missing exchange name")
@exchange_routing_key = exchange_options.delete(:routing_key) || queue_name
- @subscribe_options = options.fetch(:subscribe, {})
+ @subscribe_options = self.options.fetch(:subscribe, {}).merge(:ack => true)
end
def subscribe(options = {}, &block)
@subscription = queue.subscribe(subscribe_options.merge(options)) do |delivery_info, metadata, payload|
- block.call(delivery_info, metadata, connection.serializer.load(payload))
+ begin
+ message = MessageQueue::Message.new(:message_id => metadata[:message_id],
+ :type => metadata[:type],
+ :timestamp => metadata[:timestamp],
+ :routing_key => delivery_info[:routing_key],
+ :payload => load_object(payload))
+ block.call(message)
+ ensure
+ ack(delivery_info.delivery_tag)
+ end
end
end
- def unsubscribe
+ def unsubscribe(options = {})
@subscription.cancel if @subscription
end
def queue
@queue ||= channel.queue(queue_name, queue_options).bind(exchange_name, :routing_key => exchange_routing_key)
end
+ def ack(delivery_tag)
+ channel.ack(delivery_tag, false)
+ end
+
private
def channel
- connection.connection.create_channel
+ @channel ||= connection.connection.create_channel
end
end