lib/cloud_crowd/worker.rb in cloud-crowd-0.3.0 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.3.1
- old
+ new
@@ -1,53 +1,53 @@
module CloudCrowd
-
- # The Worker, forked off from the Node when a new WorkUnit is received,
+
+ # The Worker, forked off from the Node when a new WorkUnit is received,
# launches an Action for processing. Workers will only ever receive WorkUnits
- # that they are able to handle (for which they have a corresponding action in
- # their actions directory). If communication with the central server is
- # interrupted, the Worker will repeatedly attempt to complete its unit --
- # every Worker::RETRY_WAIT seconds. Any exceptions that take place during
- # the course of the Action will cause the Worker to mark the WorkUnit as
+ # that they are able to handle (for which they have a corresponding action in
+ # their actions directory). If communication with the central server is
+ # interrupted, the Worker will repeatedly attempt to complete its unit --
+ # every Worker::RETRY_WAIT seconds. Any exceptions that take place during
+ # the course of the Action will cause the Worker to mark the WorkUnit as
# having failed. When finished, the Worker's process exits, minimizing the
# potential for memory leaks.
class Worker
-
+
# Wait five seconds to retry, after internal communcication errors.
RETRY_WAIT = 5
-
+
attr_reader :pid, :node, :unit, :status
-
+
# A new Worker customizes itself to its WorkUnit at instantiation.
def initialize(node, unit)
@start_time = Time.now
@pid = $$
@node = node
@unit = unit
@status = @unit['status']
@retry_wait = RETRY_WAIT
end
-
+
# Return output to the central server, marking the WorkUnit done.
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = base_params.merge({:status => 'succeeded', :output => result})
@node.central["/work/#{data[:id]}"].put(data)
log "finished #{display_work_unit} in #{data[:time]} seconds"
end
end
-
+
# Mark the WorkUnit failed, returning the exception to central.
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
@node.central["/work/#{data[:id]}"].put(data)
log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
end
end
-
+
# We expect and require internal communication between the central server
- # and the workers to succeed. If it fails for any reason, log it, and then
+ # and the workers to succeed. If it fails for any reason, log it, and then
# keep trying the same request.
def keep_trying_to(title)
begin
yield
rescue RestClient::ResourceNotFound => e
@@ -58,17 +58,17 @@
log e.backtrace
sleep @retry_wait
retry
end
end
-
+
# Loggable details describing what the Worker is up to.
def display_work_unit
"unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})"
end
-
- # Executes the WorkUnit by running the Action, catching all exceptions as
+
+ # Executes the WorkUnit by running the Action, catching all exceptions as
# failures. We capture the thread so that we can kill it from the outside,
# when exiting.
def run_work_unit
begin
result = nil
@@ -80,18 +80,18 @@
when SPLITTING then action.split
when MERGING then action.merge
else raise Error::StatusUnspecified, "work units must specify their status"
end
end
+ action.cleanup_work_directory if action
complete_work_unit({'output' => result}.to_json)
rescue Exception => e
- fail_work_unit(e)
- ensure
action.cleanup_work_directory if action
+ fail_work_unit(e)
end
end
-
+
# Run this worker inside of a fork. Attempts to exit cleanly.
# Wraps run_work_unit to benchmark the execution time, if requested.
def run
trap_signals
log "starting #{display_work_unit}"
@@ -100,50 +100,50 @@
else
run_work_unit
end
Process.exit!
end
-
- # There are some potentially important attributes of the WorkUnit that we'd
- # like to pass into the Action -- in case it needs to know them. They will
+
+ # There are some potentially important attributes of the WorkUnit that we'd
+ # like to pass into the Action -- in case it needs to know them. They will
# always be made available in the options hash.
def enhanced_unit_options
@unit['options'].merge({
'job_id' => @unit['job_id'],
'work_unit_id' => @unit['id'],
- 'attempts' => @unit['attempts']
+ 'attempts' => @unit['attempts']
})
end
-
+
# How long has this worker been running for?
def time_taken
Time.now - @start_time
end
-
-
+
+
private
-
- # Common parameters to send back to central upon unit completion,
+
+ # Common parameters to send back to central upon unit completion,
# regardless of success or failure.
def base_params
{ :pid => @pid,
- :id => @unit['id'],
+ :id => @unit['id'],
:time => time_taken }
end
-
+
# Log a message to the daemon log. Includes PID for identification.
def log(message)
puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
end
-
+
# When signaled to exit, make sure that the Worker shuts down without firing
# the Node's at_exit callbacks.
def trap_signals
Signal.trap('QUIT') { Process.exit! }
Signal.trap('INT') { Process.exit! }
Signal.trap('KILL') { Process.exit! }
Signal.trap('TERM') { Process.exit! }
end
-
+
end
-
+
end
\ No newline at end of file