lib/message_queue/adapters/bunny/consumer.rb in message_queue-0.0.4 vs lib/message_queue/adapters/bunny/consumer.rb in message_queue-0.1.0

- old
+ new

@@ -1,7 +1,6 @@ -class MessageQueue::Adapters::Bunny::Connection::Consumer - attr_reader :connection +class MessageQueue::Adapters::Bunny::Connection::Consumer < MessageQueue::Consumer attr_reader :queue_options, :queue_name attr_reader :exchange_options, :exchange_name, :exchange_routing_key attr_reader :subscribe_options # Public: Initialize a new Bunny consumer. @@ -23,39 +22,50 @@ # and # https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb. # # Returns a Consumer. def initialize(connection, options = {}) - @connection = connection + super - options = options.dup - - @queue_options = options.fetch(:queue) + @queue_options = self.options.fetch(:queue) @queue_name = queue_options.delete(:name) || (raise "Missing queue name") - @exchange_options = options.fetch(:exchange) + @exchange_options = self.options.fetch(:exchange) @exchange_name = exchange_options.delete(:name) || (raise "Missing exchange name") @exchange_routing_key = exchange_options.delete(:routing_key) || queue_name - @subscribe_options = options.fetch(:subscribe, {}) + @subscribe_options = self.options.fetch(:subscribe, {}).merge(:ack => true) end def subscribe(options = {}, &block) @subscription = queue.subscribe(subscribe_options.merge(options)) do |delivery_info, metadata, payload| - block.call(delivery_info, metadata, connection.serializer.load(payload)) + begin + message = MessageQueue::Message.new(:message_id => metadata[:message_id], + :type => metadata[:type], + :timestamp => metadata[:timestamp], + :routing_key => delivery_info[:routing_key], + :payload => load_object(payload)) + block.call(message) + ensure + ack(delivery_info.delivery_tag) + end end end - def unsubscribe + def unsubscribe(options = {}) @subscription.cancel if @subscription end def queue @queue ||= channel.queue(queue_name, queue_options).bind(exchange_name, :routing_key => exchange_routing_key) end + def ack(delivery_tag) + channel.ack(delivery_tag, false) + end + private def channel - connection.connection.create_channel + @channel ||= connection.connection.create_channel end end