lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.152789 vs lib/bbk/amqp/consumer.rb in bbk-amqp-1.1.0.199380
- old
+ new
@@ -4,10 +4,11 @@
module BBK
module AMQP
class Consumer
attr_reader :connection, :queue_name, :queue, :rejection_policy, :options, :logger
+ attr_accessor :publisher
DEFAULT_OPTIONS = {
consumer_pool_size: 3,
consumer_pool_abort_on_exception: true,
prefetch_size: 10,
@@ -15,14 +16,15 @@
rejection_policy: RejectionPolicies::Requeue.new
}.freeze
PROTOCOLS = %w[mq amqp amqps].freeze
- def initialize(connection, queue_name: nil, **options)
+ def initialize(connection, queue_name: nil, publisher: nil, **options)
@connection = connection
@channel = options.delete(:channel)
@queue = options.delete(:queue)
+ @publisher = publisher
if @queue.nil? && queue_name.nil?
raise ArgumentError.new('queue_name or queue must be provided!')
end
@@ -77,11 +79,21 @@
# @param answer [BBK::App::Dispatcher::Result] answer message
def ack(incoming, *args, answer: nil, **kwargs)
# [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у
# которого to_i (вызывается внутри channel.ack) вернет фактическоe число
# logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
+ send_answer(incoming, answer) unless answer.nil?
logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag]
+ end
+
+ protected def send_answer(incoming, answer)
+ if publisher.nil?
+ logger.error "Can't answer message: empty publisher"
+ raise "Publisher not configured in consumer #{self.inspect}"
+ end
+ logger.debug "Send answer message for incoming(message_id=#{incoming.message_id}) to #{answer.route}"
+ publisher.publish(answer).value!
end
# Nack incoming message
# @param incoming [BBK::AMQP::Message] nack procesing message
def nack(incoming, *args, error: nil, **_kwargs)