Sha256: ff9f8655dbff503e69c983d45ef106322a46a6f32509797122202dfbd87668ef
Contents?: true
Size: 958 Bytes
Versions: 1
Compression:
Stored size: 958 Bytes
Contents
# -*- encoding : utf-8 -*- module Untied module Consumer class Worker def initialize(opts) @channel = opts[:channel] @queue_name = opts[:queue_name] || "" @consumer = opts[:consumer] || Processor.new @exchange = opts[:exchange] Consumer.config.logger.info "Worker initialized and listening" end def start @channel.queue(@queue_name, :exclusive => true) do |queue| queue.bind(@exchange, :routing_key => "untied.#").subscribe do |h,p| safe_process { @consumer.process(h,p) } end end end protected def safe_process(&block) begin yield rescue => e if Consumer.config.abort_on_exception raise e else Consumer.config.logger.error e.message Consumer.config.logger.error e.backtrace.join("\n\t") end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
untied-consumer-0.0.3 | lib/untied-consumer/worker.rb |