Sha256: ec6c9f961be974e027667579d84f1e0ae60fa12d20122932e28589051d909a6a

Contents?: true

Size: 1.93 KB

Versions: 2

Compression:

Stored size: 1.93 KB

Contents

require_relative 'responder_handler'
require_relative 'message_handler'
require_relative 'request'
require_relative 'delivery'

class Freddy
  class Consumer

    class EmptyConsumer < Exception
    end

    def initialize(channel, logger)
      @channel, @logger = channel, logger
      @topic_exchange = @channel.topic Freddy::FREDDY_TOPIC_EXCHANGE_NAME
    end

    def consume(destination, options = {}, &block)
      raise EmptyConsumer unless block
      consume_from_queue create_queue(destination), options, &block
    end

    def consume_from_queue(queue, options = {}, &block)
      consumer = queue.subscribe options do |delivery_info, properties, payload|
        Thread.new do
          parsed_payload = parse_payload(payload)
          log_receive_event(queue.name, parsed_payload)
          block.call parsed_payload, Delivery.new(delivery_info, properties)
        end
      end
      @logger.debug "Consuming messages on #{queue.name}"
      ResponderHandler.new consumer, @channel
    end

    def tap_into(pattern, &block)
      queue = @channel.queue("", exclusive: true).bind(@topic_exchange, routing_key: pattern)
      consumer = queue.subscribe do |delivery_info, properties, payload|
        Thread.new do
          block.call parse_payload(payload), delivery_info.routing_key
        end
      end
      @logger.debug "Tapping into messages that match #{pattern}"
      ResponderHandler.new consumer, @channel
    end

    private

    def parse_payload(payload)
      if payload == 'null'
        {}
      else
        Symbolizer.symbolize(JSON(payload))
      end
    end

    def create_queue(destination)
      @channel.queue(destination)
    end

    def log_receive_event(queue_name, payload)
      if defined?(Logasm) && @logger.is_a?(Logasm)
        @logger.debug "Received message", queue: queue_name, payload: payload
      else
        @logger.debug "Received message on #{queue_name} with payload #{payload}"
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
freddy-0.3.7 lib/freddy/consumer.rb
freddy-0.3.6 lib/freddy/consumer.rb