lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.152789 vs lib/bbk/amqp/consumer.rb in bbk-amqp-1.1.0.199380

- old
+ new

@@ -4,10 +4,11 @@ module BBK module AMQP class Consumer attr_reader :connection, :queue_name, :queue, :rejection_policy, :options, :logger + attr_accessor :publisher DEFAULT_OPTIONS = { consumer_pool_size: 3, consumer_pool_abort_on_exception: true, prefetch_size: 10, @@ -15,14 +16,15 @@ rejection_policy: RejectionPolicies::Requeue.new }.freeze PROTOCOLS = %w[mq amqp amqps].freeze - def initialize(connection, queue_name: nil, **options) + def initialize(connection, queue_name: nil, publisher: nil, **options) @connection = connection @channel = options.delete(:channel) @queue = options.delete(:queue) + @publisher = publisher if @queue.nil? && queue_name.nil? raise ArgumentError.new('queue_name or queue must be provided!') end @@ -77,11 +79,21 @@ # @param answer [BBK::App::Dispatcher::Result] answer message def ack(incoming, *args, answer: nil, **kwargs) # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у # которого to_i (вызывается внутри channel.ack) вернет фактическоe число # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" + send_answer(incoming, answer) unless answer.nil? logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag] + end + + protected def send_answer(incoming, answer) + if publisher.nil? + logger.error "Can't answer message: empty publisher" + raise "Publisher not configured in consumer #{self.inspect}" + end + logger.debug "Send answer message for incoming(message_id=#{incoming.message_id}) to #{answer.route}" + publisher.publish(answer).value! end # Nack incoming message # @param incoming [BBK::AMQP::Message] nack procesing message def nack(incoming, *args, error: nil, **_kwargs)