lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.5 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.6

- old
+ new

@@ -3,23 +3,31 @@ # The Worker, run at intervals by the Daemon, fetches WorkUnits from the # central server and dispatches Actions to process them. Workers only fetch # units that they are able to handle (for which they have an action in their # actions directory). If communication with the central server is interrupted, # the WorkUnit will repeatedly attempt to complete its unit -- every - # +worker_retry_wait+ seconds. Any exceptions that take place during + # Worker::RETRY_WAIT seconds. Any exceptions that take place during # the course of the Action will cause the Worker to mark the WorkUnit as # having failed. class Worker + + # The time between worker check-ins with the central server, informing + # it of the current status, and simply that it's still alive. + CHECK_IN_INTERVAL = 60 + + # Wait five seconds to retry, after internal communcication errors. + RETRY_WAIT = 5 attr_reader :action # Spinning up a worker will create a new AssetStore with a persistent # connection to S3. This AssetStore gets passed into each action, for use # as it is run. def initialize @id = $$ @hostname = Socket.gethostname + @name = "#{@id}@#{@hostname}" @store = AssetStore.new @server = CloudCrowd.central_server @enabled_actions = CloudCrowd.actions.keys log 'started' end @@ -35,48 +43,75 @@ # Return output to the central server, marking the current work unit as done. def complete_work_unit(result) keep_trying_to "complete work unit" do data = completion_params.merge({:status => 'succeeded', :output => result}) unit_json = @server["/work/#{data[:id]}"].put(data) - log "finished #{@action_name} in #{data[:time]} seconds" + log "finished #{display_work_unit} in #{data[:time]} seconds" clear_work_unit setup_work_unit(unit_json) end end # Mark the current work unit as failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do - data = completion_params.merge({:status => 'failed', :output => exception.message}) + data = completion_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) unit_json = @server["/work/#{data[:id]}"].put(data) - log "failed #{@action_name} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" + log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" clear_work_unit setup_work_unit(unit_json) end end + # Check in with the central server. Let it know the condition of the work + # thread, the action and status we're processing, and our hostname and PID. + def check_in(thread_status) + keep_trying_to "check in with central" do + @server["/worker"].put({ + :name => @name, + :thread_status => thread_status + }) + end + end + + # Inform the central server that this worker is finished. This is the only + # remote method that doesn't retry on connection errors -- if the worker + # can't connect to the central server while it's trying to shutdown, it + # should close, regardless. + def check_out + @server["/worker"].put({ + :name => @name, + :terminated => true + }) + log 'exiting' + end + # We expect and require internal communication between the central server # and the workers to succeed. If it fails for any reason, log it, and then # keep trying the same request. def keep_trying_to(title) begin yield rescue Exception => e - wait_time = CloudCrowd.config[:worker_retry_wait] - log "failed to #{title} -- retry in #{wait_time} seconds" + log "failed to #{title} -- retry in #{RETRY_WAIT} seconds" log e.message log e.backtrace - sleep wait_time + sleep RETRY_WAIT retry end end # Does this Worker have a job to do? def has_work? @action_name && @input && @options end + # Loggable string of the current work unit. + def display_work_unit + "unit ##{@options['work_unit_id']} (#{@action_name})" + end + # Executes the current work unit, catching all exceptions as failures. def run_work_unit begin @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store) result = case @status @@ -101,11 +136,14 @@ private # Common parameters to send back to central. def base_params - @base_params ||= {:enabled_actions => @enabled_actions.join(',')} + @base_params ||= { + :worker_name => @name, + :worker_actions => @enabled_actions.join(',') + } end # Common parameters to send back to central upon unit completion, # regardless of success or failure. def completion_params @@ -122,16 +160,16 @@ @start_time = Time.now @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] @options['job_id'] = unit['job_id'] @options['work_unit_id'] = unit['id'] @options['attempts'] ||= unit['attempts'] - log "fetched work unit for #{@action_name}" + log "fetched #{display_work_unit}" return true end # Log a message to the daemon log. Includes PID for identification. def log(message) - puts "Worker ##{@id}: #{message}" + puts "Worker ##{@id}: #{message}" unless ENV['RACK_ENV'] == 'test' end # When we're done with a unit, clear out our instance variables to make way # for the next one. Also, remove all of the unit's temporary storage. def clear_work_unit \ No newline at end of file