lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.2

- old
+ new

@@ -148,11 +148,11 @@ fail MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' end @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] - @message_handler = case options[:ack] + @message_handler = case options.delete(:ack) when :auto, nil AutoAckHandler.new(self) when :manual ManualAckHandler.new(self) when :transactional @@ -236,11 +236,12 @@ def start_subscription @sub_ctx.with_channel do |ch| queue = destination.bunny_queue(@sub_ctx.channel) ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size - @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| + sub_opts = options.merge(adapter.ack_key => true) + @bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload| adapter.broker.client.with_adapter_context(@sub_ctx) do message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination) @message_handler.call(message) end end @@ -250,12 +251,15 @@ def initialize(broker, config) validate_bunny_version @broker = broker @config = config + @ack_key = Bunny::VERSION >= '1.5.0' ? :manual_ack : :ack end + attr_reader :ack_key + def connection(ensure_started = true) if ensure_started begin @connection ||= Bunny::Session.new(@config) @connection.start @@ -389,10 +393,10 @@ fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination with_channel(false) do |ch| queue = ch.queue(destination.name, passive: true) - message = queue.pop(ack: options.fetch(:client_ack, false)) + message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false)) if message.nil? || message[0].nil? nil else args_to_message(*message, destination) end