Sha256: 1177edd4068aebd035750293f17de9bf5b6bb18404c8e8a24327772830829f9c
Contents?: true
Size: 1.71 KB
Versions: 3
Compression:
Stored size: 1.71 KB
Contents
# Stolen from RubyTapas by Avdi Grimm, episode 145. module Daemonic class Pool class StopSignal def inspect "[STOP SIGNAL]" end alias_method :to_s, :inspect end STOP_SIGNAL = StopSignal.new attr_reader :producer def initialize(producer) @producer = producer @jobs = SizedQueue.new(producer.queue_size) @threads = producer.concurrency.times.map {|worker_num| Thread.new do dispatch(worker_num) end } end def enqueue(job) logger.debug { "Enqueueing #{job.inspect}" } @jobs.push(job) end alias_method :<<, :enqueue def stop @threads.size.times do enqueue(STOP_SIGNAL) end @threads.each(&:join) end private def dispatch(worker_num) logger.debug { "T#{worker_num}: Starting" } loop do job = @jobs.pop if STOP_SIGNAL.equal?(job) logger.debug { "T#{worker_num}: Received stop signal, terminating." } break end begin logger.debug { "T#{worker_num}: Consuming #{job.inspect}" } worker.consume(job) Thread.pass rescue Object => error if error.is_a?(SystemExit) # allow app to exit logger.warn { "T#{worker_num}: Received SystemExit, shutting down" } producer.stop else logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" } logger.info { error.backtrace.join("\n") } end Thread.pass end end logger.debug { "T#{worker_num}: Stopped" } end def worker producer.worker end def logger producer.logger end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
daemonic-0.1.3 | lib/daemonic/pool.rb |
daemonic-0.1.2 | lib/daemonic/pool.rb |
daemonic-0.1.1 | lib/daemonic/pool.rb |