lib/cloud_crowd/worker.rb in cloud-crowd-0.2.0 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.2.1
- old
+ new
@@ -12,19 +12,20 @@
class Worker
# Wait five seconds to retry, after internal communcication errors.
RETRY_WAIT = 5
- attr_reader :action
+ attr_reader :pid, :node, :unit, :status
- # A new Worker begins processing its WorkUnit straight off.
- def initialize(node, work_unit)
- @pid = $$
- @node = node
- trap_signals
- setup_work_unit(work_unit)
- run
+ # A new Worker customizes itself to its WorkUnit at instantiation.
+ def initialize(node, unit)
+ @start_time = Time.now
+ @pid = $$
+ @node = node
+ @unit = unit
+ @status = @unit['status']
+ @retry_wait = RETRY_WAIT
end
# Return output to the central server, marking the WorkUnit done.
def complete_work_unit(result)
keep_trying_to "complete work unit" do
@@ -47,77 +48,85 @@
# and the workers to succeed. If it fails for any reason, log it, and then
# keep trying the same request.
def keep_trying_to(title)
begin
yield
+ rescue RestClient::ResourceNotFound => e
+ log "work unit ##{@unit['id']} doesn't exist. discarding..."
rescue Exception => e
- log "failed to #{title} -- retry in #{RETRY_WAIT} seconds"
+ log "failed to #{title} -- retry in #{@retry_wait} seconds"
log e.message
log e.backtrace
- sleep RETRY_WAIT
+ sleep @retry_wait
retry
end
end
# Loggable details describing what the Worker is up to.
def display_work_unit
- "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})"
+ "unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})"
end
# Executes the WorkUnit by running the Action, catching all exceptions as
# failures. We capture the thread so that we can kill it from the outside,
# when exiting.
def run_work_unit
@worker_thread = Thread.new do
begin
result = nil
- @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store)
- Dir.chdir(@action.work_directory) do
+ action_class = CloudCrowd.actions[@unit['action']]
+ action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store)
+ Dir.chdir(action.work_directory) do
result = case @status
- when PROCESSING then @action.process
- when SPLITTING then @action.split
- when MERGING then @action.merge
+ when PROCESSING then action.process
+ when SPLITTING then action.split
+ when MERGING then action.merge
else raise Error::StatusUnspecified, "work units must specify their status"
end
end
complete_work_unit({'output' => result}.to_json)
rescue Exception => e
fail_work_unit(e)
ensure
- @action.cleanup_work_directory
+ action.cleanup_work_directory if action
end
end
@worker_thread.join
end
# Wraps run_work_unit to benchmark the execution time, if requested.
def run
- return run_work_unit unless @options['benchmark']
- status = CloudCrowd.display_status(@status)
- log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s)
+ trap_signals
+ log "starting #{display_work_unit}"
+ return run_work_unit unless @unit['options']['benchmark']
+ log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s)
end
+ # There are some potentially important attributes of the WorkUnit that we'd
+ # like to pass into the Action -- in case it needs to know them. They will
+ # always be made available in the options hash.
+ def enhanced_unit_options
+ @unit['options'].merge({
+ 'job_id' => @unit['job_id'],
+ 'work_unit_id' => @unit['id'],
+ 'attempts' => @unit['attempts']
+ })
+ end
+ # How long has this worker been running for?
+ def time_taken
+ Time.now - @start_time
+ end
+
+
private
# Common parameters to send back to central upon unit completion,
# regardless of success or failure.
def base_params
{ :pid => @pid,
- :id => @options['work_unit_id'],
- :time => Time.now - @start_time }
- end
-
- # Extract the Worker's instance variables from a WorkUnit's JSON.
- def setup_work_unit(unit)
- return false unless unit
- @start_time = Time.now
- @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status']
- @options['job_id'] = unit['job_id']
- @options['work_unit_id'] = unit['id']
- @options['attempts'] ||= unit['attempts']
- log "fetched #{display_work_unit}"
- return true
+ :id => @unit['id'],
+ :time => time_taken }
end
# Log a message to the daemon log. Includes PID for identification.
def log(message)
puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
\ No newline at end of file