lib/daemonic/pool.rb in daemonic-0.0.2 vs lib/daemonic/pool.rb in daemonic-0.1.0

- old
+ new

@@ -1,104 +1,65 @@ -require 'daemonic/logging' -require 'daemonic/worker' - +# Stolen from RubyTapas by Avdi Grimm, episode 145. module Daemonic class Pool - include Logging - attr_reader :config + class StopSignal - attr_reader :workers, :desired_workers + def inspect + "[STOP SIGNAL]" + end + alias_method :to_s, :inspect - def initialize(config) - @config = config - @workers = [] - reload_desired_workers end - def start - wait_for global_timeout do - increase if count < desired_workers - count == desired_workers - end - decrease while count > desired_workers + STOP_SIGNAL = StopSignal.new + + def initialize(thread_count, worker, logger) + @worker = worker + @jobs = SizedQueue.new(thread_count) + @logger = logger + @threads = thread_count.times.map {|worker_num| + Thread.new do + dispatch(worker_num) + end + } end - def restart - workers.each do |worker| - worker.restart - yield worker if block_given? - end + def enqueue(job) + @logger.debug { "Enqueueing #{job.inspect}" } + @jobs.push(job) end + alias_method :<<, :enqueue def stop - workers.each do |worker| - worker.stop - yield worker if block_given? + @threads.size.times do + enqueue(STOP_SIGNAL) end + @threads.each(&:join) end - def hup - reload_desired_workers - workers.each(&:hup) - start - end - - def count - workers.count { |worker| worker.running? } - end - - def increase - workers << start_worker(workers.size) - end - - def increase! - @desired_workers += 1 - increase - end - - def decrease - workers.pop.stop - end - - def decrease! - @desired_workers -= 1 - decrease - end - - def monitor - workers.each(&:monitor) - end - private - def start_worker(num) - Worker.new( - index: num, - config: config, - ).tap(&:start) - end - - def reload_desired_workers - @desired_workers = config.workers - end - - def global_timeout - (desired_workers * 2) + 1 - end - - def wait_for(timeout=2) - deadline = Time.now + timeout - until Time.now >= deadline - result = yield - if result - return - else - sleep 0.1 + def dispatch(worker_num) + @logger.debug { "T#{worker_num}: Starting" } + loop do + job = @jobs.pop + if STOP_SIGNAL.equal?(job) + @logger.debug { "T#{worker_num}: Received stop signal, terminating." } + break end + begin + @logger.debug { "T#{worker_num}: Consuming #{job.inspect}" } + @worker.consume(job) + Thread.pass + rescue Object => error + @logger.warn { "T#{worker_num}: Error while processing #{job}: #{error.class}: #{error}" } + @logger.info { error.backtrace.join("\n") } + Thread.pass + end end - fatal "Unable to get to boot the right amount of workers. Running: #{count}, desired: #{desired_workers}." - stop - exit 1 + @logger.debug { "T#{worker_num}: Stopped" } end + + end end