lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.4 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.5

- old
+ new

@@ -1,26 +1,35 @@ module CloudCrowd + # The Worker, run at intervals by the Daemon, fetches WorkUnits from the + # central server and dispatches Actions to process them. Workers only fetch + # units that they are able to handle (for which they have an action in their + # actions directory). If communication with the central server is interrupted, + # the WorkUnit will repeatedly attempt to complete its unit -- every + # +worker_retry_wait+ seconds. Any exceptions that take place during + # the course of the Action will cause the Worker to mark the WorkUnit as + # having failed. class Worker attr_reader :action # Spinning up a worker will create a new AssetStore with a persistent # connection to S3. This AssetStore gets passed into each action, for use # as it is run. def initialize - @id = $$ - @hostname = Socket.gethostname - @store = AssetStore.new - @server = CloudCrowd.central_server + @id = $$ + @hostname = Socket.gethostname + @store = AssetStore.new + @server = CloudCrowd.central_server + @enabled_actions = CloudCrowd.actions.keys log 'started' end - # Ask the central server for a new WorkUnit. + # Ask the central server for the first WorkUnit in line. def fetch_work_unit keep_trying_to "fetch a new work unit" do - unit_json = @server['/work'].get + unit_json = @server['/work'].post(base_params) setup_work_unit(unit_json) end end # Return output to the central server, marking the current work unit as done. @@ -67,37 +76,45 @@ end # Executes the current work unit, catching all exceptions as failures. def run_work_unit begin - @action = CloudCrowd.actions(@action_name).new - @action.configure(@status, @input, @options, @store) + @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store) result = case @status when PROCESSING then @action.process when SPLITTING then @action.split when MERGING then @action.merge - else raise "Work units must specify their status." + else raise StatusUnspecified, "work units must specify their status" end - complete_work_unit(result) + complete_work_unit({'output' => result}.to_json) rescue Exception => e fail_work_unit(e) end end - # Wraps +run_work_unit+ to benchmark the execution time, if requested. + # Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested. def run return run_work_unit unless @options['benchmark'] status = CloudCrowd.display_status(@status) log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s) end private - # Common parameters to send back to central, regardless of success or failure. + # Common parameters to send back to central. + def base_params + @base_params ||= {:enabled_actions => @enabled_actions.join(',')} + end + + # Common parameters to send back to central upon unit completion, + # regardless of success or failure. def completion_params - {:id => @options['work_unit_id'], :time => Time.now - @start_time} + base_params.merge({ + :id => @options['work_unit_id'], + :time => Time.now - @start_time + }) end # Extract our instance variables from a WorkUnit's JSON. def setup_work_unit(unit_json) return false unless unit_json @@ -114,11 +131,11 @@ # Log a message to the daemon log. Includes PID for identification. def log(message) puts "Worker ##{@id}: #{message}" end - # When we're done with a unit, clear out our ivars to make way for the next. - # Also, remove all of the previous unit's temporary storage. + # When we're done with a unit, clear out our instance variables to make way + # for the next one. Also, remove all of the unit's temporary storage. def clear_work_unit @action.cleanup_work_directory @action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil end \ No newline at end of file