lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.1.1 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.2.0

- old
+ new

@@ -1,56 +1,45 @@ module CloudCrowd - # The Worker, run at intervals by the Daemon, fetches WorkUnits from the - # central server and dispatches Actions to process them. Workers only fetch - # units that they are able to handle (for which they have an action in their - # actions directory). If communication with the central server is interrupted, - # the WorkUnit will repeatedly attempt to complete its unit -- every - # Worker::RETRY_WAIT seconds. Any exceptions that take place during + # The Worker, forked off from the Node when a new WorkUnit is received, + # launches an Action for processing. Workers will only ever receive WorkUnits + # that they are able to handle (for which they have a corresponding action in + # their actions directory). If communication with the central server is + # interrupted, the Worker will repeatedly attempt to complete its unit -- + # every Worker::RETRY_WAIT seconds. Any exceptions that take place during # the course of the Action will cause the Worker to mark the WorkUnit as - # having failed. + # having failed. When finished, the Worker's process exits, minimizing the + # potential for memory leaks. class Worker # Wait five seconds to retry, after internal communcication errors. RETRY_WAIT = 5 attr_reader :action - # Spinning up a worker will create a new AssetStore with a persistent - # connection to S3. This AssetStore gets passed into each action, for use - # as it is run. + # A new Worker begins processing its WorkUnit straight off. def initialize(node, work_unit) - Signal.trap('INT') { shut_down } - Signal.trap('KILL') { shut_down } - Signal.trap('TERM') { shut_down } @pid = $$ @node = node + trap_signals setup_work_unit(work_unit) run end - # # Ask the central server for the first WorkUnit in line. - # def fetch_work_unit - # keep_trying_to "fetch a new work unit" do - # unit_json = @server['/work'].post(base_params) - # setup_work_unit(unit_json) - # end - # end - - # Return output to the central server, marking the current work unit as done. + # Return output to the central server, marking the WorkUnit done. def complete_work_unit(result) keep_trying_to "complete work unit" do - data = completion_params.merge({:status => 'succeeded', :output => result}) + data = base_params.merge({:status => 'succeeded', :output => result}) @node.server["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" end end - # Mark the current work unit as failed, returning the exception to central. + # Mark the WorkUnit failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do - data = completion_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) + data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) @node.server["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" end end @@ -67,16 +56,18 @@ sleep RETRY_WAIT retry end end - # Loggable string of the current work unit. + # Loggable details describing what the Worker is up to. def display_work_unit "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})" end - # Executes the current work unit, catching all exceptions as failures. + # Executes the WorkUnit by running the Action, catching all exceptions as + # failures. We capture the thread so that we can kill it from the outside, + # when exiting. def run_work_unit @worker_thread = Thread.new do begin result = nil @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store) @@ -89,42 +80,36 @@ end end complete_work_unit({'output' => result}.to_json) rescue Exception => e fail_work_unit(e) + ensure + @action.cleanup_work_directory end end @worker_thread.join end - # Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested. + # Wraps run_work_unit to benchmark the execution time, if requested. def run return run_work_unit unless @options['benchmark'] status = CloudCrowd.display_status(@status) log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s) end private - # Common parameters to send back to central. - def base_params - @base_params ||= { - :pid => @pid - } - end - # Common parameters to send back to central upon unit completion, # regardless of success or failure. - def completion_params - base_params.merge({ - :id => @options['work_unit_id'], - :time => Time.now - @start_time - }) + def base_params + { :pid => @pid, + :id => @options['work_unit_id'], + :time => Time.now - @start_time } end - # Extract our instance variables from a WorkUnit's JSON. + # Extract the Worker's instance variables from a WorkUnit's JSON. def setup_work_unit(unit) return false unless unit @start_time = Time.now @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] @options['job_id'] = unit['job_id'] @@ -137,18 +122,18 @@ # Log a message to the daemon log. Includes PID for identification. def log(message) puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test' end - # When we're done with a unit, clear out our instance variables to make way - # for the next one. Also, remove all of the unit's temporary storage. - def clear_work_unit - @action.cleanup_work_directory - @action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil + # When signaled to exit, make sure that the Worker shuts down cleanly. + def trap_signals + Signal.trap('INT') { shut_down } + Signal.trap('KILL') { shut_down } + Signal.trap('TERM') { shut_down } end - # Force the worker to quit, even if it's in the middle of processing. - # If it had checked out a work unit, the node should have released it on + # Force the Worker to quit, even if it's in the middle of processing. + # If it had a checked-out WorkUnit, the Node should have released it on # the central server already. def shut_down if @worker_thread @worker_thread.kill @worker_thread.kill! if @worker_thread.alive? \ No newline at end of file