lib/cloud_crowd/daemon.rb in documentcloud-cloud-crowd-0.0.5 vs lib/cloud_crowd/daemon.rb in documentcloud-cloud-crowd-0.0.6

- old
+ new

@@ -1,9 +1,7 @@ CloudCrowd.configure(ENV['CLOUD_CROWD_CONFIG']) -require 'cloud_crowd/worker' - module CloudCrowd # A CloudCrowd::Daemon, started by the Daemons gem, runs a CloudCrowd::Worker in # a loop, continually fetching and processing WorkUnits from the central # server. @@ -13,40 +11,82 @@ # # The `crowd` command responds to all the usual methods that the Daemons gem # supports. class Daemon - MIN_WAIT = CloudCrowd.config[:min_worker_wait] - MAX_WAIT = CloudCrowd.config[:max_worker_wait] - WAIT_MULTIPLIER = CloudCrowd.config[:worker_wait_multiplier] + # The back-off factor used to slow down requests for new work units + # when the queue is empty. + WAIT_MULTIPLIER = 1.5 + MIN_WAIT = CloudCrowd.config[:min_worker_wait] + MAX_WAIT = CloudCrowd.config[:max_worker_wait] + def initialize - @wait_time = MIN_WAIT - @worker = Worker.new - Signal.trap('INT', 'EXIT') - Signal.trap('KILL', 'EXIT') - Signal.trap('TERM', 'EXIT') + @wait_time = MIN_WAIT + @worker = Worker.new + Signal.trap('INT') { kill_worker_and_exit } + Signal.trap('KILL') { kill_worker_and_exit } + Signal.trap('TERM') { kill_worker_and_exit } end - # Loop forever, fetching WorkUnits. - # TODO: Workers busy with their work units won't die until the unit has - # been finished. This should probably be wrapped in an appropriately lengthy - # timeout, or should be killable from the outside by terminating the thread. - # In either case, nasty un-cleaned-up bits might be left behind. + # Spin up our worker and monitoring threads. The monitor's the boss, and + # will feel no compunction in killing the worker thread if necessary. + # Check in before starting up. If check in fails, there's no sense in going. def run - loop do - @worker.fetch_work_unit - if @worker.has_work? - @wait_time = MIN_WAIT - while @worker.has_work? - @worker.run - sleep 0.01 # So as to listen for incoming signals. + @worker.check_in('starting') + @work_thread = run_worker + @monitor_thread = run_monitor + @monitor_thread.join + end + + + private + + # Loop forever, fetching WorkUnits and processing them. + def run_worker + Thread.new do + loop do + @worker.fetch_work_unit + if @worker.has_work? + @wait_time = MIN_WAIT + while @worker.has_work? + @worker.run + sleep 0.01 # So as to listen for incoming signals. + end + else + @wait_time = [@wait_time * WAIT_MULTIPLIER, MAX_WAIT].min + sleep @wait_time end - else - @wait_time = [@wait_time * WAIT_MULTIPLIER, MAX_WAIT].min - sleep @wait_time end end + end + + # Checks in to let the central server know it's still alive every + # CHECK_IN_INTERVAL seconds. Restarts the work_thread if it has died. + def run_monitor + Thread.new do + sleep Worker::CHECK_IN_INTERVAL + loop do + @work_thread = run_monitor unless @work_thread.alive? || @exit_started + @worker.check_in(@work_thread.status) + sleep Worker::CHECK_IN_INTERVAL + end + end + end + + def running? + @work_thread.alive? || @monitor_thread.alive? + end + + # At exit, kill the worker thread, gently at first, then forcefully. + def kill_worker_and_exit + @worker.check_out + @exit_started = Time.now + @work_thread.kill && @monitor_thread.kill + sleep 0.3 while running? && Time.now - @exit_started < WORKER_EXIT_WAIT + return Process.exit unless running? + @work_thread.kill! && @monitor_thread.kill! + Process.exit end end end \ No newline at end of file