lib/daemonic/pool.rb in daemonic-0.1.0 vs lib/daemonic/pool.rb in daemonic-0.1.1
- old
+ new
@@ -11,23 +11,24 @@
end
STOP_SIGNAL = StopSignal.new
- def initialize(thread_count, worker, logger)
- @worker = worker
- @jobs = SizedQueue.new(thread_count)
- @logger = logger
- @threads = thread_count.times.map {|worker_num|
+ attr_reader :producer
+
+ def initialize(producer)
+ @producer = producer
+ @jobs = SizedQueue.new(producer.queue_size)
+ @threads = producer.concurrency.times.map {|worker_num|
Thread.new do
dispatch(worker_num)
end
}
end
def enqueue(job)
- @logger.debug { "Enqueueing #{job.inspect}" }
+ logger.debug { "Enqueueing #{job.inspect}" }
@jobs.push(job)
end
alias_method :<<, :enqueue
def stop
@@ -38,28 +39,40 @@
end
private
def dispatch(worker_num)
- @logger.debug { "T#{worker_num}: Starting" }
+ logger.debug { "T#{worker_num}: Starting" }
loop do
job = @jobs.pop
if STOP_SIGNAL.equal?(job)
- @logger.debug { "T#{worker_num}: Received stop signal, terminating." }
+ logger.debug { "T#{worker_num}: Received stop signal, terminating." }
break
end
begin
- @logger.debug { "T#{worker_num}: Consuming #{job.inspect}" }
- @worker.consume(job)
+ logger.debug { "T#{worker_num}: Consuming #{job.inspect}" }
+ worker.consume(job)
Thread.pass
rescue Object => error
- @logger.warn { "T#{worker_num}: Error while processing #{job}: #{error.class}: #{error}" }
- @logger.info { error.backtrace.join("\n") }
+ if error.is_a?(SystemExit) # allow app to exit
+ logger.warn { "T#{worker_num}: Received SystemExit, shutting down" }
+ producer.stop
+ else
+ logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" }
+ logger.info { error.backtrace.join("\n") }
+ end
Thread.pass
end
end
- @logger.debug { "T#{worker_num}: Stopped" }
+ logger.debug { "T#{worker_num}: Stopped" }
end
+ def worker
+ producer.worker
+ end
+
+ def logger
+ producer.logger
+ end
end
end