Sha256: 4b1ff94d4294708886e72125e461c670765dfceaa74267291ae819e7c5354e8b
Contents?: true
Size: 1.42 KB
Versions: 6
Compression:
Stored size: 1.42 KB
Contents
class Freddy module Consumers class RespondToConsumer def initialize(consume_thread_pool, logger) @consume_thread_pool = consume_thread_pool @logger = logger end def consume(destination, channel, handler_factory, &block) consumer = consume_from_destination(destination, channel) do |delivery| log_receive_event(destination, delivery) handler = handler_factory.build(delivery.type, destination) msg_handler = MessageHandler.new(handler, delivery) handler.handle_message delivery.payload, msg_handler, &block end ResponderHandler.new(consumer, @consume_thread_pool) end private def consume_from_destination(destination, channel, &block) channel.queue(destination).subscribe do |delivery| process_message(delivery, &block) end end def process_message(delivery, &block) @consume_thread_pool.process do block.call(delivery) end end def log_receive_event(destination, delivery) if defined?(Logasm) && @logger.is_a?(Logasm) @logger.debug "Received message", queue: destination, payload: delivery.payload, correlation_id: delivery.correlation_id else @logger.debug "Received message on #{destination} with payload #{delivery.payload} with correlation_id #{delivery.correlation_id}" end end end end end
Version data entries
6 entries across 6 versions & 2 rubygems