Sha256: 6a6e5dbefb1ccb56df99cda271ad9fbfed12a2f93a4a3c6e53749fcb27edf227
Contents?: true
Size: 1.04 KB
Versions: 2
Compression:
Stored size: 1.04 KB
Contents
require 'pika_que/broker' require 'pika_que/logging' require 'pika_que/util' module PikaQue class Processor include Logging def initialize(opts = {}) @opts = PikaQue.config.merge(opts) @broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start } @pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1) proc_config = @opts.merge({ broker: @broker, worker_pool: @pool }) @workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) } @thread = nil end def setup logger.info "setting up processor with workers: #{@workers.map(&:class)}" @workers.each(&:prepare) end def process @workers.each(&:run) end def start @thread = Thread.new do Thread.current['label'] = 'processor' setup process end.abort_on_exception = true end def stop @workers.each(&:stop) @pool.shutdown @pool.wait_for_termination 12 @broker.cleanup(true) @broker.stop end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
pika_que-0.1.5 | lib/pika_que/processor.rb |
pika_que-0.1.4 | lib/pika_que/processor.rb |