lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.2
- old
+ new
@@ -148,11 +148,11 @@
fail MessageDriver::Error,
'subscriptions are only supported with QueueDestinations'
end
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
- @message_handler = case options[:ack]
+ @message_handler = case options.delete(:ack)
when :auto, nil
AutoAckHandler.new(self)
when :manual
ManualAckHandler.new(self)
when :transactional
@@ -236,11 +236,12 @@
def start_subscription
@sub_ctx.with_channel do |ch|
queue = destination.bunny_queue(@sub_ctx.channel)
ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size
- @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
+ sub_opts = options.merge(adapter.ack_key => true)
+ @bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload|
adapter.broker.client.with_adapter_context(@sub_ctx) do
message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination)
@message_handler.call(message)
end
end
@@ -250,12 +251,15 @@
def initialize(broker, config)
validate_bunny_version
@broker = broker
@config = config
+ @ack_key = Bunny::VERSION >= '1.5.0' ? :manual_ack : :ack
end
+ attr_reader :ack_key
+
def connection(ensure_started = true)
if ensure_started
begin
@connection ||= Bunny::Session.new(@config)
@connection.start
@@ -389,10 +393,10 @@
fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
with_channel(false) do |ch|
queue = ch.queue(destination.name, passive: true)
- message = queue.pop(ack: options.fetch(:client_ack, false))
+ message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false))
if message.nil? || message[0].nil?
nil
else
args_to_message(*message, destination)
end