lib/cloud_crowd/worker.rb in cloud-crowd-0.2.1 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.2.2

- old
+ new

@@ -28,20 +28,20 @@ # Return output to the central server, marking the WorkUnit done. def complete_work_unit(result) keep_trying_to "complete work unit" do data = base_params.merge({:status => 'succeeded', :output => result}) - @node.server["/work/#{data[:id]}"].put(data) + @node.central["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" end end # Mark the WorkUnit failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) - @node.server["/work/#{data[:id]}"].put(data) + @node.central["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" end end # We expect and require internal communication between the central server @@ -68,39 +68,41 @@ # Executes the WorkUnit by running the Action, catching all exceptions as # failures. We capture the thread so that we can kill it from the outside, # when exiting. def run_work_unit - @worker_thread = Thread.new do - begin - result = nil - action_class = CloudCrowd.actions[@unit['action']] - action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store) - Dir.chdir(action.work_directory) do - result = case @status - when PROCESSING then action.process - when SPLITTING then action.split - when MERGING then action.merge - else raise Error::StatusUnspecified, "work units must specify their status" - end + begin + result = nil + action_class = CloudCrowd.actions[@unit['action']] + action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store) + Dir.chdir(action.work_directory) do + result = case @status + when PROCESSING then action.process + when SPLITTING then action.split + when MERGING then action.merge + else raise Error::StatusUnspecified, "work units must specify their status" end - complete_work_unit({'output' => result}.to_json) - rescue Exception => e - fail_work_unit(e) - ensure - action.cleanup_work_directory if action end + complete_work_unit({'output' => result}.to_json) + rescue Exception => e + fail_work_unit(e) + ensure + action.cleanup_work_directory if action end - @worker_thread.join end + # Run this worker inside of a fork. Attempts to exit cleanly. # Wraps run_work_unit to benchmark the execution time, if requested. def run trap_signals log "starting #{display_work_unit}" - return run_work_unit unless @unit['options']['benchmark'] - log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s) + if @unit['options']['benchmark'] + log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s) + else + run_work_unit + end + Process.exit! end # There are some potentially important attributes of the WorkUnit that we'd # like to pass into the Action -- in case it needs to know them. They will # always be made available in the options hash. @@ -131,25 +133,16 @@ # Log a message to the daemon log. Includes PID for identification. def log(message) puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test' end - # When signaled to exit, make sure that the Worker shuts down cleanly. + # When signaled to exit, make sure that the Worker shuts down without firing + # the Node's at_exit callbacks. def trap_signals - Signal.trap('INT') { shut_down } - Signal.trap('KILL') { shut_down } - Signal.trap('TERM') { shut_down } - end - - # Force the Worker to quit, even if it's in the middle of processing. - # If it had a checked-out WorkUnit, the Node should have released it on - # the central server already. - def shut_down - if @worker_thread - @worker_thread.kill - @worker_thread.kill! if @worker_thread.alive? - end - Process.exit + Signal.trap('QUIT') { Process.exit! } + Signal.trap('INT') { Process.exit! } + Signal.trap('KILL') { Process.exit! } + Signal.trap('TERM') { Process.exit! } end end end \ No newline at end of file