lib/daemonic/pool.rb in daemonic-0.1.0 vs lib/daemonic/pool.rb in daemonic-0.1.1

- old
+ new

@@ -11,23 +11,24 @@ end STOP_SIGNAL = StopSignal.new - def initialize(thread_count, worker, logger) - @worker = worker - @jobs = SizedQueue.new(thread_count) - @logger = logger - @threads = thread_count.times.map {|worker_num| + attr_reader :producer + + def initialize(producer) + @producer = producer + @jobs = SizedQueue.new(producer.queue_size) + @threads = producer.concurrency.times.map {|worker_num| Thread.new do dispatch(worker_num) end } end def enqueue(job) - @logger.debug { "Enqueueing #{job.inspect}" } + logger.debug { "Enqueueing #{job.inspect}" } @jobs.push(job) end alias_method :<<, :enqueue def stop @@ -38,28 +39,40 @@ end private def dispatch(worker_num) - @logger.debug { "T#{worker_num}: Starting" } + logger.debug { "T#{worker_num}: Starting" } loop do job = @jobs.pop if STOP_SIGNAL.equal?(job) - @logger.debug { "T#{worker_num}: Received stop signal, terminating." } + logger.debug { "T#{worker_num}: Received stop signal, terminating." } break end begin - @logger.debug { "T#{worker_num}: Consuming #{job.inspect}" } - @worker.consume(job) + logger.debug { "T#{worker_num}: Consuming #{job.inspect}" } + worker.consume(job) Thread.pass rescue Object => error - @logger.warn { "T#{worker_num}: Error while processing #{job}: #{error.class}: #{error}" } - @logger.info { error.backtrace.join("\n") } + if error.is_a?(SystemExit) # allow app to exit + logger.warn { "T#{worker_num}: Received SystemExit, shutting down" } + producer.stop + else + logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" } + logger.info { error.backtrace.join("\n") } + end Thread.pass end end - @logger.debug { "T#{worker_num}: Stopped" } + logger.debug { "T#{worker_num}: Stopped" } end + def worker + producer.worker + end + + def logger + producer.logger + end end end