Sha256: 938d7f141e9c7a4bb1d03cbf0cc164fda4930efe6f8fac53d5cf796b0878028c

Contents?: true

Size: 1.51 KB

Versions: 2

Compression:

Stored size: 1.51 KB

Contents

class Freddy
  module Consumers
    class RespondToConsumer
      def initialize(consume_thread_pool, producer, logger)
        @consume_thread_pool = consume_thread_pool
        @producer = producer
        @logger = logger
      end

      def consume(destination, channel, &block)
        consumer = consume_from_destination(destination, channel) do |delivery|
          log_receive_event(destination, delivery)

          handler_class = MessageHandlers.for_type(delivery.type)
          handler = handler_class.new(@producer, destination, @logger)

          msg_handler = MessageHandler.new(handler, delivery)
          handler.handle_message delivery.payload, msg_handler, &block
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def consume_from_destination(destination, channel, &block)
        channel.queue(destination).subscribe do |delivery|
          process_message(delivery, &block)
        end
      end

      def process_message(delivery, &block)
        @consume_thread_pool.process do
          block.call(delivery)
        end
      end

      def log_receive_event(destination, delivery)
        if defined?(Logasm) && @logger.is_a?(Logasm)
          @logger.debug "Received message", queue: destination, payload: delivery.payload, correlation_id: delivery.correlation_id
        else
          @logger.debug "Received message on #{destination} with payload #{delivery.payload} with correlation_id #{delivery.correlation_id}"
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
freddy-jruby-0.4.9 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.4.9 lib/freddy/consumers/respond_to_consumer.rb