lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.4 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.5
- old
+ new
@@ -1,26 +1,35 @@
module CloudCrowd
+ # The Worker, run at intervals by the Daemon, fetches WorkUnits from the
+ # central server and dispatches Actions to process them. Workers only fetch
+ # units that they are able to handle (for which they have an action in their
+ # actions directory). If communication with the central server is interrupted,
+ # the WorkUnit will repeatedly attempt to complete its unit -- every
+ # +worker_retry_wait+ seconds. Any exceptions that take place during
+ # the course of the Action will cause the Worker to mark the WorkUnit as
+ # having failed.
class Worker
attr_reader :action
# Spinning up a worker will create a new AssetStore with a persistent
# connection to S3. This AssetStore gets passed into each action, for use
# as it is run.
def initialize
- @id = $$
- @hostname = Socket.gethostname
- @store = AssetStore.new
- @server = CloudCrowd.central_server
+ @id = $$
+ @hostname = Socket.gethostname
+ @store = AssetStore.new
+ @server = CloudCrowd.central_server
+ @enabled_actions = CloudCrowd.actions.keys
log 'started'
end
- # Ask the central server for a new WorkUnit.
+ # Ask the central server for the first WorkUnit in line.
def fetch_work_unit
keep_trying_to "fetch a new work unit" do
- unit_json = @server['/work'].get
+ unit_json = @server['/work'].post(base_params)
setup_work_unit(unit_json)
end
end
# Return output to the central server, marking the current work unit as done.
@@ -67,37 +76,45 @@
end
# Executes the current work unit, catching all exceptions as failures.
def run_work_unit
begin
- @action = CloudCrowd.actions(@action_name).new
- @action.configure(@status, @input, @options, @store)
+ @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store)
result = case @status
when PROCESSING then @action.process
when SPLITTING then @action.split
when MERGING then @action.merge
- else raise "Work units must specify their status."
+ else raise StatusUnspecified, "work units must specify their status"
end
- complete_work_unit(result)
+ complete_work_unit({'output' => result}.to_json)
rescue Exception => e
fail_work_unit(e)
end
end
- # Wraps +run_work_unit+ to benchmark the execution time, if requested.
+ # Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested.
def run
return run_work_unit unless @options['benchmark']
status = CloudCrowd.display_status(@status)
log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s)
end
private
- # Common parameters to send back to central, regardless of success or failure.
+ # Common parameters to send back to central.
+ def base_params
+ @base_params ||= {:enabled_actions => @enabled_actions.join(',')}
+ end
+
+ # Common parameters to send back to central upon unit completion,
+ # regardless of success or failure.
def completion_params
- {:id => @options['work_unit_id'], :time => Time.now - @start_time}
+ base_params.merge({
+ :id => @options['work_unit_id'],
+ :time => Time.now - @start_time
+ })
end
# Extract our instance variables from a WorkUnit's JSON.
def setup_work_unit(unit_json)
return false unless unit_json
@@ -114,11 +131,11 @@
# Log a message to the daemon log. Includes PID for identification.
def log(message)
puts "Worker ##{@id}: #{message}"
end
- # When we're done with a unit, clear out our ivars to make way for the next.
- # Also, remove all of the previous unit's temporary storage.
+ # When we're done with a unit, clear out our instance variables to make way
+ # for the next one. Also, remove all of the unit's temporary storage.
def clear_work_unit
@action.cleanup_work_directory
@action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil
end
\ No newline at end of file