Sha256: c46b578c071f59bbaad06ab12e179135162af8f61d01a8b6ba9545861dc6995f
Contents?: true
Size: 1.91 KB
Versions: 3
Compression:
Stored size: 1.91 KB
Contents
require_relative 'responder_handler' require_relative 'message_handler' require_relative 'request' require_relative 'delivery' class Freddy class Consumer class EmptyConsumer < Exception end def initialize(channel, logger) @channel, @logger = channel, logger @topic_exchange = @channel.topic Freddy::FREDDY_TOPIC_EXCHANGE_NAME end def consume(destination, options = {}, &block) raise EmptyConsumer unless block consume_from_queue create_queue(destination), options, &block end def consume_from_queue(queue, options = {}, &block) consumer = queue.subscribe options do |delivery_info, properties, payload| parsed_payload = parse_payload(payload) log_receive_event(queue.name, parsed_payload) block.call parsed_payload, Delivery.new(delivery_info, properties) end @logger.debug "Consuming messages on #{queue.name}" ResponderHandler.new consumer, @channel end def tap_into(pattern, &block) queue = @channel.queue("", exclusive: true).bind(@topic_exchange, routing_key: pattern) consumer = queue.subscribe do |delivery_info, properties, payload| block.call parse_payload(payload), delivery_info.routing_key end @logger.debug "Tapping into messages that match #{pattern}" ResponderHandler.new consumer, @channel end private def parse_payload(payload) if payload == 'null' {} else Symbolizer.symbolize(JSON(payload)) end end def create_queue(destination) @channel.queue(destination) end def log_receive_event(queue_name, payload) if defined?(Logasm) && @logger.is_a?(Logasm) @logger.info "Received message", queue: queue_name @logger.debug "Received message", queue: queue_name, payload: payload else @logger.debug "Received message on #{queue_name} with payload #{payload}" end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
freddy-0.3.2 | lib/freddy/consumer.rb |
freddy-0.3.1 | lib/freddy/consumer.rb |
freddy-0.3.0 | lib/freddy/consumer.rb |