lib/cloud_crowd/worker.rb in cloud-crowd-0.2.0 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.2.1

- old
+ new

@@ -12,19 +12,20 @@ class Worker # Wait five seconds to retry, after internal communcication errors. RETRY_WAIT = 5 - attr_reader :action + attr_reader :pid, :node, :unit, :status - # A new Worker begins processing its WorkUnit straight off. - def initialize(node, work_unit) - @pid = $$ - @node = node - trap_signals - setup_work_unit(work_unit) - run + # A new Worker customizes itself to its WorkUnit at instantiation. + def initialize(node, unit) + @start_time = Time.now + @pid = $$ + @node = node + @unit = unit + @status = @unit['status'] + @retry_wait = RETRY_WAIT end # Return output to the central server, marking the WorkUnit done. def complete_work_unit(result) keep_trying_to "complete work unit" do @@ -47,77 +48,85 @@ # and the workers to succeed. If it fails for any reason, log it, and then # keep trying the same request. def keep_trying_to(title) begin yield + rescue RestClient::ResourceNotFound => e + log "work unit ##{@unit['id']} doesn't exist. discarding..." rescue Exception => e - log "failed to #{title} -- retry in #{RETRY_WAIT} seconds" + log "failed to #{title} -- retry in #{@retry_wait} seconds" log e.message log e.backtrace - sleep RETRY_WAIT + sleep @retry_wait retry end end # Loggable details describing what the Worker is up to. def display_work_unit - "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})" + "unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})" end # Executes the WorkUnit by running the Action, catching all exceptions as # failures. We capture the thread so that we can kill it from the outside, # when exiting. def run_work_unit @worker_thread = Thread.new do begin result = nil - @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store) - Dir.chdir(@action.work_directory) do + action_class = CloudCrowd.actions[@unit['action']] + action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store) + Dir.chdir(action.work_directory) do result = case @status - when PROCESSING then @action.process - when SPLITTING then @action.split - when MERGING then @action.merge + when PROCESSING then action.process + when SPLITTING then action.split + when MERGING then action.merge else raise Error::StatusUnspecified, "work units must specify their status" end end complete_work_unit({'output' => result}.to_json) rescue Exception => e fail_work_unit(e) ensure - @action.cleanup_work_directory + action.cleanup_work_directory if action end end @worker_thread.join end # Wraps run_work_unit to benchmark the execution time, if requested. def run - return run_work_unit unless @options['benchmark'] - status = CloudCrowd.display_status(@status) - log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s) + trap_signals + log "starting #{display_work_unit}" + return run_work_unit unless @unit['options']['benchmark'] + log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s) end + # There are some potentially important attributes of the WorkUnit that we'd + # like to pass into the Action -- in case it needs to know them. They will + # always be made available in the options hash. + def enhanced_unit_options + @unit['options'].merge({ + 'job_id' => @unit['job_id'], + 'work_unit_id' => @unit['id'], + 'attempts' => @unit['attempts'] + }) + end + # How long has this worker been running for? + def time_taken + Time.now - @start_time + end + + private # Common parameters to send back to central upon unit completion, # regardless of success or failure. def base_params { :pid => @pid, - :id => @options['work_unit_id'], - :time => Time.now - @start_time } - end - - # Extract the Worker's instance variables from a WorkUnit's JSON. - def setup_work_unit(unit) - return false unless unit - @start_time = Time.now - @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] - @options['job_id'] = unit['job_id'] - @options['work_unit_id'] = unit['id'] - @options['attempts'] ||= unit['attempts'] - log "fetched #{display_work_unit}" - return true + :id => @unit['id'], + :time => time_taken } end # Log a message to the daemon log. Includes PID for identification. def log(message) puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test' \ No newline at end of file