Sha256: b688dba7d25092d8add3f95d5439022bef531850650e3f98ee4f2a4e44198702

Contents?: true

Size: 1.06 KB

Versions: 6

Compression:

Stored size: 1.06 KB

Contents

class Freddy
  module Consumers
    class ResponseConsumer
      def initialize(logger)
        @logger = logger
        @dedicated_thread_pool = Thread.pool(1)
      end

      def consume(queue, &block)
        @logger.debug "Consuming messages on #{queue.name}"
        consumer = queue.subscribe do |delivery|
          process_message(queue, delivery, &block)
        end
        ResponderHandler.new(consumer, @dedicated_thread_pool)
      end

      private

      def process_message(queue, delivery, &block)
        @dedicated_thread_pool.process do
          log_receive_event(queue.name, delivery)
          block.call(delivery)
        end
      end

      def log_receive_event(queue_name, delivery)
        if defined?(Logasm) && @logger.is_a?(Logasm)
          @logger.debug "Received message", queue: queue_name, payload: delivery.payload, correlation_id: delivery.correlation_id
        else
          @logger.debug "Received message on #{queue_name} with payload #{delivery.payload} with correlation_id #{delivery.correlation_id}"
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 2 rubygems

Version Path
freddy-0.5.3 lib/freddy/consumers/response_consumer.rb
freddy-0.5.2 lib/freddy/consumers/response_consumer.rb
freddy-jruby-0.5.1 lib/freddy/consumers/response_consumer.rb
freddy-0.5.1 lib/freddy/consumers/response_consumer.rb
freddy-jruby-0.5.0 lib/freddy/consumers/response_consumer.rb
freddy-0.5.0 lib/freddy/consumers/response_consumer.rb