lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.2 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.3

- old
+ new

@@ -20,32 +20,33 @@ # Ask the central server for a new WorkUnit. def fetch_work_unit keep_trying_to "fetch a new work unit" do unit_json = @server['/work'].get - return unless unit_json # No content means no work for us. - @start_time = Time.now - parse_work_unit unit_json - log "fetched work unit for #{@action_name}" + setup_work_unit(unit_json) end end # Return output to the central server, marking the current work unit as done. def complete_work_unit(result) keep_trying_to "complete work unit" do data = completion_params.merge({:status => 'succeeded', :output => result}) - @server["/work/#{data[:id]}"].put(data) + unit_json = @server["/work/#{data[:id]}"].put(data) log "finished #{@action_name} in #{data[:time]} seconds" + clear_work_unit + setup_work_unit(unit_json) end end # Mark the current work unit as failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do data = completion_params.merge({:status => 'failed', :output => exception.message}) - @server["/work/#{data[:id]}"].put(data) + unit_json = @server["/work/#{data[:id]}"].put(data) log "failed #{@action_name} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" + clear_work_unit + setup_work_unit(unit_json) end end def keep_trying_to(title) begin @@ -76,12 +77,10 @@ else raise "Work units must specify their status." end complete_work_unit(result) rescue Exception => e fail_work_unit(e) - ensure - clear_work_unit end end # Wraps +run_work_unit+ to benchmark the execution time, if requested. def run @@ -105,15 +104,19 @@ def completion_params {:id => @options['work_unit_id'], :time => Time.now - @start_time} end # Extract our instance variables from a WorkUnit's JSON. - def parse_work_unit(unit_json) + def setup_work_unit(unit_json) + return false unless unit_json unit = JSON.parse(unit_json) + @start_time = Time.now @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] @options['job_id'] = unit['job_id'] @options['work_unit_id'] = unit['id'] @options['attempts'] ||= unit['attempts'] + log "fetched work unit for #{@action_name}" + return true end # Log a message to the daemon log. Includes PID for identification. def log(message) puts "Worker ##{@id}: #{message}" \ No newline at end of file