Sha256: ff9f8655dbff503e69c983d45ef106322a46a6f32509797122202dfbd87668ef

Contents?: true

Size: 958 Bytes

Versions: 1

Compression:

Stored size: 958 Bytes

Contents

# -*- encoding : utf-8 -*-
module Untied
  module Consumer
    class Worker
      def initialize(opts)
        @channel = opts[:channel]
        @queue_name = opts[:queue_name] || ""
        @consumer = opts[:consumer] || Processor.new
        @exchange = opts[:exchange]

        Consumer.config.logger.info "Worker initialized and listening"
      end

      def start
        @channel.queue(@queue_name, :exclusive => true) do |queue|
          queue.bind(@exchange, :routing_key => "untied.#").subscribe do |h,p|
            safe_process { @consumer.process(h,p) }
          end
        end
      end

      protected

      def safe_process(&block)
        begin
          yield
        rescue => e
          if Consumer.config.abort_on_exception
            raise e
          else
            Consumer.config.logger.error e.message
            Consumer.config.logger.error e.backtrace.join("\n\t")
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
untied-consumer-0.0.3 lib/untied-consumer/worker.rb