lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.1
- old
+ new
@@ -179,17 +179,18 @@
class MessageHandler
extend Forwardable
include Logging
attr_accessor :subscription
- def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options
+ def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options, :destination
def initialize(subscription)
@subscription = subscription
end
- def call(message)
+ def call(*message_args)
+ message = sub_ctx.args_to_message(*message_args, destination)
consumer.call(message)
rescue => e
error_handler.call(e, message) unless error_handler.nil?
end
@@ -211,21 +212,23 @@
class ManualAckHandler < MessageHandler
# all functionality implemented in super class
end
class AutoAckHandler < MessageHandler
- def call(message)
+ def call(*message_args)
+ message = sub_ctx.args_to_message(*message_args, destination)
consumer.call(message)
sub_ctx.ack_message(message)
rescue => e
nack_message(e, message)
error_handler.call(e, message) unless error_handler.nil?
end
end
class TransactionalAckHandler < MessageHandler
- def call(message)
+ def call(*message_args)
+ message = sub_ctx.args_to_message(*message_args, destination)
adapter.broker.client.with_message_transaction do
consumer.call(message)
sub_ctx.ack_message(message)
end
rescue => e
@@ -239,11 +242,10 @@
queue = destination.bunny_queue(@sub_ctx.channel)
ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size
sub_opts = options.merge(adapter.ack_key => true)
@bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload|
adapter.broker.client.with_adapter_context(@sub_ctx) do
- message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination)
- @message_handler.call(message)
+ @message_handler.call(delivery_info, properties, payload)
end
end
end
end
end