lib/cloud_crowd/daemon.rb in documentcloud-cloud-crowd-0.0.5 vs lib/cloud_crowd/daemon.rb in documentcloud-cloud-crowd-0.0.6
- old
+ new
@@ -1,9 +1,7 @@
CloudCrowd.configure(ENV['CLOUD_CROWD_CONFIG'])
-require 'cloud_crowd/worker'
-
module CloudCrowd
# A CloudCrowd::Daemon, started by the Daemons gem, runs a CloudCrowd::Worker in
# a loop, continually fetching and processing WorkUnits from the central
# server.
@@ -13,40 +11,82 @@
#
# The `crowd` command responds to all the usual methods that the Daemons gem
# supports.
class Daemon
- MIN_WAIT = CloudCrowd.config[:min_worker_wait]
- MAX_WAIT = CloudCrowd.config[:max_worker_wait]
- WAIT_MULTIPLIER = CloudCrowd.config[:worker_wait_multiplier]
+ # The back-off factor used to slow down requests for new work units
+ # when the queue is empty.
+ WAIT_MULTIPLIER = 1.5
+ MIN_WAIT = CloudCrowd.config[:min_worker_wait]
+ MAX_WAIT = CloudCrowd.config[:max_worker_wait]
+
def initialize
- @wait_time = MIN_WAIT
- @worker = Worker.new
- Signal.trap('INT', 'EXIT')
- Signal.trap('KILL', 'EXIT')
- Signal.trap('TERM', 'EXIT')
+ @wait_time = MIN_WAIT
+ @worker = Worker.new
+ Signal.trap('INT') { kill_worker_and_exit }
+ Signal.trap('KILL') { kill_worker_and_exit }
+ Signal.trap('TERM') { kill_worker_and_exit }
end
- # Loop forever, fetching WorkUnits.
- # TODO: Workers busy with their work units won't die until the unit has
- # been finished. This should probably be wrapped in an appropriately lengthy
- # timeout, or should be killable from the outside by terminating the thread.
- # In either case, nasty un-cleaned-up bits might be left behind.
+ # Spin up our worker and monitoring threads. The monitor's the boss, and
+ # will feel no compunction in killing the worker thread if necessary.
+ # Check in before starting up. If check in fails, there's no sense in going.
def run
- loop do
- @worker.fetch_work_unit
- if @worker.has_work?
- @wait_time = MIN_WAIT
- while @worker.has_work?
- @worker.run
- sleep 0.01 # So as to listen for incoming signals.
+ @worker.check_in('starting')
+ @work_thread = run_worker
+ @monitor_thread = run_monitor
+ @monitor_thread.join
+ end
+
+
+ private
+
+ # Loop forever, fetching WorkUnits and processing them.
+ def run_worker
+ Thread.new do
+ loop do
+ @worker.fetch_work_unit
+ if @worker.has_work?
+ @wait_time = MIN_WAIT
+ while @worker.has_work?
+ @worker.run
+ sleep 0.01 # So as to listen for incoming signals.
+ end
+ else
+ @wait_time = [@wait_time * WAIT_MULTIPLIER, MAX_WAIT].min
+ sleep @wait_time
end
- else
- @wait_time = [@wait_time * WAIT_MULTIPLIER, MAX_WAIT].min
- sleep @wait_time
end
end
+ end
+
+ # Checks in to let the central server know it's still alive every
+ # CHECK_IN_INTERVAL seconds. Restarts the work_thread if it has died.
+ def run_monitor
+ Thread.new do
+ sleep Worker::CHECK_IN_INTERVAL
+ loop do
+ @work_thread = run_monitor unless @work_thread.alive? || @exit_started
+ @worker.check_in(@work_thread.status)
+ sleep Worker::CHECK_IN_INTERVAL
+ end
+ end
+ end
+
+ def running?
+ @work_thread.alive? || @monitor_thread.alive?
+ end
+
+ # At exit, kill the worker thread, gently at first, then forcefully.
+ def kill_worker_and_exit
+ @worker.check_out
+ @exit_started = Time.now
+ @work_thread.kill && @monitor_thread.kill
+ sleep 0.3 while running? && Time.now - @exit_started < WORKER_EXIT_WAIT
+ return Process.exit unless running?
+ @work_thread.kill! && @monitor_thread.kill!
+ Process.exit
end
end
end
\ No newline at end of file