lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.1.0 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.1.1

- old
+ new

@@ -8,85 +8,54 @@ # Worker::RETRY_WAIT seconds. Any exceptions that take place during # the course of the Action will cause the Worker to mark the WorkUnit as # having failed. class Worker - # The time between worker check-ins with the central server, informing - # it of the current status, and simply that it's still alive. - CHECK_IN_INTERVAL = 60 - # Wait five seconds to retry, after internal communcication errors. RETRY_WAIT = 5 attr_reader :action # Spinning up a worker will create a new AssetStore with a persistent # connection to S3. This AssetStore gets passed into each action, for use # as it is run. - def initialize - @id = $$ - @hostname = Socket.gethostname - @name = "#{@id}@#{@hostname}" - @store = AssetStore.new - @server = CloudCrowd.central_server - @enabled_actions = CloudCrowd.actions.keys - log 'started' + def initialize(node, work_unit) + Signal.trap('INT') { shut_down } + Signal.trap('KILL') { shut_down } + Signal.trap('TERM') { shut_down } + @pid = $$ + @node = node + setup_work_unit(work_unit) + run end - # Ask the central server for the first WorkUnit in line. - def fetch_work_unit - keep_trying_to "fetch a new work unit" do - unit_json = @server['/work'].post(base_params) - setup_work_unit(unit_json) - end - end + # # Ask the central server for the first WorkUnit in line. + # def fetch_work_unit + # keep_trying_to "fetch a new work unit" do + # unit_json = @server['/work'].post(base_params) + # setup_work_unit(unit_json) + # end + # end # Return output to the central server, marking the current work unit as done. def complete_work_unit(result) keep_trying_to "complete work unit" do data = completion_params.merge({:status => 'succeeded', :output => result}) - unit_json = @server["/work/#{data[:id]}"].put(data) + @node.server["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" - clear_work_unit - setup_work_unit(unit_json) end end # Mark the current work unit as failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do data = completion_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) - unit_json = @server["/work/#{data[:id]}"].put(data) + @node.server["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" - clear_work_unit - setup_work_unit(unit_json) end end - # Check in with the central server. Let it know the condition of the work - # thread, the action and status we're processing, and our hostname and PID. - def check_in(thread_status) - keep_trying_to "check in with central" do - @server["/worker"].put({ - :name => @name, - :thread_status => thread_status - }) - end - end - - # Inform the central server that this worker is finished. This is the only - # remote method that doesn't retry on connection errors -- if the worker - # can't connect to the central server while it's trying to shutdown, it - # should close, regardless. - def check_out - @server["/worker"].put({ - :name => @name, - :terminated => true - }) - log 'exiting' - end - # We expect and require internal communication between the central server # and the workers to succeed. If it fails for any reason, log it, and then # keep trying the same request. def keep_trying_to(title) begin @@ -98,37 +67,35 @@ sleep RETRY_WAIT retry end end - # Does this Worker have a job to do? - def has_work? - @action_name && @input && @options - end - # Loggable string of the current work unit. def display_work_unit - "unit ##{@options['work_unit_id']} (#{@action_name})" + "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})" end # Executes the current work unit, catching all exceptions as failures. def run_work_unit - begin - result = nil - @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store) - Dir.chdir(@action.work_directory) do - result = case @status - when PROCESSING then @action.process - when SPLITTING then @action.split - when MERGING then @action.merge - else raise Error::StatusUnspecified, "work units must specify their status" + @worker_thread = Thread.new do + begin + result = nil + @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store) + Dir.chdir(@action.work_directory) do + result = case @status + when PROCESSING then @action.process + when SPLITTING then @action.split + when MERGING then @action.merge + else raise Error::StatusUnspecified, "work units must specify their status" + end end + complete_work_unit({'output' => result}.to_json) + rescue Exception => e + fail_work_unit(e) end - complete_work_unit({'output' => result}.to_json) - rescue Exception => e - fail_work_unit(e) end + @worker_thread.join end # Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested. def run return run_work_unit unless @options['benchmark'] @@ -140,12 +107,11 @@ private # Common parameters to send back to central. def base_params @base_params ||= { - :worker_name => @name, - :worker_actions => @enabled_actions.join(',') + :pid => @pid } end # Common parameters to send back to central upon unit completion, # regardless of success or failure. @@ -155,13 +121,12 @@ :time => Time.now - @start_time }) end # Extract our instance variables from a WorkUnit's JSON. - def setup_work_unit(unit_json) - return false unless unit_json - unit = JSON.parse(unit_json) + def setup_work_unit(unit) + return false unless unit @start_time = Time.now @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] @options['job_id'] = unit['job_id'] @options['work_unit_id'] = unit['id'] @options['attempts'] ||= unit['attempts'] @@ -169,17 +134,28 @@ return true end # Log a message to the daemon log. Includes PID for identification. def log(message) - puts "Worker ##{@id}: #{message}" unless ENV['RACK_ENV'] == 'test' + puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test' end # When we're done with a unit, clear out our instance variables to make way # for the next one. Also, remove all of the unit's temporary storage. def clear_work_unit @action.cleanup_work_directory @action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil + end + + # Force the worker to quit, even if it's in the middle of processing. + # If it had checked out a work unit, the node should have released it on + # the central server already. + def shut_down + if @worker_thread + @worker_thread.kill + @worker_thread.kill! if @worker_thread.alive? + end + Process.exit end end end \ No newline at end of file