Sha256: 6a6e5dbefb1ccb56df99cda271ad9fbfed12a2f93a4a3c6e53749fcb27edf227

Contents?: true

Size: 1.04 KB

Versions: 2

Compression:

Stored size: 1.04 KB

Contents

require 'pika_que/broker'
require 'pika_que/logging'
require 'pika_que/util'

module PikaQue
  class Processor
    include Logging

    def initialize(opts = {})
      @opts = PikaQue.config.merge(opts)
      @broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start }
      @pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
      proc_config = @opts.merge({ broker: @broker, worker_pool: @pool })
      @workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) }
      @thread = nil
    end

    def setup
      logger.info "setting up processor with workers: #{@workers.map(&:class)}"
      @workers.each(&:prepare)
    end

    def process
      @workers.each(&:run)
    end

    def start
      @thread = Thread.new do
        Thread.current['label'] = 'processor'
        setup
        process
      end.abort_on_exception = true
    end

    def stop
      @workers.each(&:stop)

      @pool.shutdown
      @pool.wait_for_termination 12

      @broker.cleanup(true)
      @broker.stop
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pika_que-0.1.5 lib/pika_que/processor.rb
pika_que-0.1.4 lib/pika_que/processor.rb