lib/daemonic/pool.rb in daemonic-0.0.2 vs lib/daemonic/pool.rb in daemonic-0.1.0
- old
+ new
@@ -1,104 +1,65 @@
-require 'daemonic/logging'
-require 'daemonic/worker'
-
+# Stolen from RubyTapas by Avdi Grimm, episode 145.
module Daemonic
class Pool
- include Logging
- attr_reader :config
+ class StopSignal
- attr_reader :workers, :desired_workers
+ def inspect
+ "[STOP SIGNAL]"
+ end
+ alias_method :to_s, :inspect
- def initialize(config)
- @config = config
- @workers = []
- reload_desired_workers
end
- def start
- wait_for global_timeout do
- increase if count < desired_workers
- count == desired_workers
- end
- decrease while count > desired_workers
+ STOP_SIGNAL = StopSignal.new
+
+ def initialize(thread_count, worker, logger)
+ @worker = worker
+ @jobs = SizedQueue.new(thread_count)
+ @logger = logger
+ @threads = thread_count.times.map {|worker_num|
+ Thread.new do
+ dispatch(worker_num)
+ end
+ }
end
- def restart
- workers.each do |worker|
- worker.restart
- yield worker if block_given?
- end
+ def enqueue(job)
+ @logger.debug { "Enqueueing #{job.inspect}" }
+ @jobs.push(job)
end
+ alias_method :<<, :enqueue
def stop
- workers.each do |worker|
- worker.stop
- yield worker if block_given?
+ @threads.size.times do
+ enqueue(STOP_SIGNAL)
end
+ @threads.each(&:join)
end
- def hup
- reload_desired_workers
- workers.each(&:hup)
- start
- end
-
- def count
- workers.count { |worker| worker.running? }
- end
-
- def increase
- workers << start_worker(workers.size)
- end
-
- def increase!
- @desired_workers += 1
- increase
- end
-
- def decrease
- workers.pop.stop
- end
-
- def decrease!
- @desired_workers -= 1
- decrease
- end
-
- def monitor
- workers.each(&:monitor)
- end
-
private
- def start_worker(num)
- Worker.new(
- index: num,
- config: config,
- ).tap(&:start)
- end
-
- def reload_desired_workers
- @desired_workers = config.workers
- end
-
- def global_timeout
- (desired_workers * 2) + 1
- end
-
- def wait_for(timeout=2)
- deadline = Time.now + timeout
- until Time.now >= deadline
- result = yield
- if result
- return
- else
- sleep 0.1
+ def dispatch(worker_num)
+ @logger.debug { "T#{worker_num}: Starting" }
+ loop do
+ job = @jobs.pop
+ if STOP_SIGNAL.equal?(job)
+ @logger.debug { "T#{worker_num}: Received stop signal, terminating." }
+ break
end
+ begin
+ @logger.debug { "T#{worker_num}: Consuming #{job.inspect}" }
+ @worker.consume(job)
+ Thread.pass
+ rescue Object => error
+ @logger.warn { "T#{worker_num}: Error while processing #{job}: #{error.class}: #{error}" }
+ @logger.info { error.backtrace.join("\n") }
+ Thread.pass
+ end
end
- fatal "Unable to get to boot the right amount of workers. Running: #{count}, desired: #{desired_workers}."
- stop
- exit 1
+ @logger.debug { "T#{worker_num}: Stopped" }
end
+
+
end
end