lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.2 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.3
- old
+ new
@@ -20,32 +20,33 @@
# Ask the central server for a new WorkUnit.
def fetch_work_unit
keep_trying_to "fetch a new work unit" do
unit_json = @server['/work'].get
- return unless unit_json # No content means no work for us.
- @start_time = Time.now
- parse_work_unit unit_json
- log "fetched work unit for #{@action_name}"
+ setup_work_unit(unit_json)
end
end
# Return output to the central server, marking the current work unit as done.
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = completion_params.merge({:status => 'succeeded', :output => result})
- @server["/work/#{data[:id]}"].put(data)
+ unit_json = @server["/work/#{data[:id]}"].put(data)
log "finished #{@action_name} in #{data[:time]} seconds"
+ clear_work_unit
+ setup_work_unit(unit_json)
end
end
# Mark the current work unit as failed, returning the exception to central.
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = completion_params.merge({:status => 'failed', :output => exception.message})
- @server["/work/#{data[:id]}"].put(data)
+ unit_json = @server["/work/#{data[:id]}"].put(data)
log "failed #{@action_name} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
+ clear_work_unit
+ setup_work_unit(unit_json)
end
end
def keep_trying_to(title)
begin
@@ -76,12 +77,10 @@
else raise "Work units must specify their status."
end
complete_work_unit(result)
rescue Exception => e
fail_work_unit(e)
- ensure
- clear_work_unit
end
end
# Wraps +run_work_unit+ to benchmark the execution time, if requested.
def run
@@ -105,15 +104,19 @@
def completion_params
{:id => @options['work_unit_id'], :time => Time.now - @start_time}
end
# Extract our instance variables from a WorkUnit's JSON.
- def parse_work_unit(unit_json)
+ def setup_work_unit(unit_json)
+ return false unless unit_json
unit = JSON.parse(unit_json)
+ @start_time = Time.now
@action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status']
@options['job_id'] = unit['job_id']
@options['work_unit_id'] = unit['id']
@options['attempts'] ||= unit['attempts']
+ log "fetched work unit for #{@action_name}"
+ return true
end
# Log a message to the daemon log. Includes PID for identification.
def log(message)
puts "Worker ##{@id}: #{message}"
\ No newline at end of file