lib/cloud_crowd/worker.rb in cloud-crowd-0.2.1 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.2.2
- old
+ new
@@ -28,20 +28,20 @@
# Return output to the central server, marking the WorkUnit done.
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = base_params.merge({:status => 'succeeded', :output => result})
- @node.server["/work/#{data[:id]}"].put(data)
+ @node.central["/work/#{data[:id]}"].put(data)
log "finished #{display_work_unit} in #{data[:time]} seconds"
end
end
# Mark the WorkUnit failed, returning the exception to central.
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
- @node.server["/work/#{data[:id]}"].put(data)
+ @node.central["/work/#{data[:id]}"].put(data)
log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
end
end
# We expect and require internal communication between the central server
@@ -68,39 +68,41 @@
# Executes the WorkUnit by running the Action, catching all exceptions as
# failures. We capture the thread so that we can kill it from the outside,
# when exiting.
def run_work_unit
- @worker_thread = Thread.new do
- begin
- result = nil
- action_class = CloudCrowd.actions[@unit['action']]
- action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store)
- Dir.chdir(action.work_directory) do
- result = case @status
- when PROCESSING then action.process
- when SPLITTING then action.split
- when MERGING then action.merge
- else raise Error::StatusUnspecified, "work units must specify their status"
- end
+ begin
+ result = nil
+ action_class = CloudCrowd.actions[@unit['action']]
+ action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store)
+ Dir.chdir(action.work_directory) do
+ result = case @status
+ when PROCESSING then action.process
+ when SPLITTING then action.split
+ when MERGING then action.merge
+ else raise Error::StatusUnspecified, "work units must specify their status"
end
- complete_work_unit({'output' => result}.to_json)
- rescue Exception => e
- fail_work_unit(e)
- ensure
- action.cleanup_work_directory if action
end
+ complete_work_unit({'output' => result}.to_json)
+ rescue Exception => e
+ fail_work_unit(e)
+ ensure
+ action.cleanup_work_directory if action
end
- @worker_thread.join
end
+ # Run this worker inside of a fork. Attempts to exit cleanly.
# Wraps run_work_unit to benchmark the execution time, if requested.
def run
trap_signals
log "starting #{display_work_unit}"
- return run_work_unit unless @unit['options']['benchmark']
- log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s)
+ if @unit['options']['benchmark']
+ log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s)
+ else
+ run_work_unit
+ end
+ Process.exit!
end
# There are some potentially important attributes of the WorkUnit that we'd
# like to pass into the Action -- in case it needs to know them. They will
# always be made available in the options hash.
@@ -131,25 +133,16 @@
# Log a message to the daemon log. Includes PID for identification.
def log(message)
puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
end
- # When signaled to exit, make sure that the Worker shuts down cleanly.
+ # When signaled to exit, make sure that the Worker shuts down without firing
+ # the Node's at_exit callbacks.
def trap_signals
- Signal.trap('INT') { shut_down }
- Signal.trap('KILL') { shut_down }
- Signal.trap('TERM') { shut_down }
- end
-
- # Force the Worker to quit, even if it's in the middle of processing.
- # If it had a checked-out WorkUnit, the Node should have released it on
- # the central server already.
- def shut_down
- if @worker_thread
- @worker_thread.kill
- @worker_thread.kill! if @worker_thread.alive?
- end
- Process.exit
+ Signal.trap('QUIT') { Process.exit! }
+ Signal.trap('INT') { Process.exit! }
+ Signal.trap('KILL') { Process.exit! }
+ Signal.trap('TERM') { Process.exit! }
end
end
end
\ No newline at end of file