lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.1

- old
+ new

@@ -179,17 +179,18 @@ class MessageHandler extend Forwardable include Logging attr_accessor :subscription - def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options + def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options, :destination def initialize(subscription) @subscription = subscription end - def call(message) + def call(*message_args) + message = sub_ctx.args_to_message(*message_args, destination) consumer.call(message) rescue => e error_handler.call(e, message) unless error_handler.nil? end @@ -211,21 +212,23 @@ class ManualAckHandler < MessageHandler # all functionality implemented in super class end class AutoAckHandler < MessageHandler - def call(message) + def call(*message_args) + message = sub_ctx.args_to_message(*message_args, destination) consumer.call(message) sub_ctx.ack_message(message) rescue => e nack_message(e, message) error_handler.call(e, message) unless error_handler.nil? end end class TransactionalAckHandler < MessageHandler - def call(message) + def call(*message_args) + message = sub_ctx.args_to_message(*message_args, destination) adapter.broker.client.with_message_transaction do consumer.call(message) sub_ctx.ack_message(message) end rescue => e @@ -239,11 +242,10 @@ queue = destination.bunny_queue(@sub_ctx.channel) ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size sub_opts = options.merge(adapter.ack_key => true) @bunny_consumer = queue.subscribe(sub_opts) do |delivery_info, properties, payload| adapter.broker.client.with_adapter_context(@sub_ctx) do - message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination) - @message_handler.call(message) + @message_handler.call(delivery_info, properties, payload) end end end end end