lib/cloud_crowd/worker.rb in cloud-crowd-0.1.0 vs lib/cloud_crowd/worker.rb in cloud-crowd-0.2.0

- old
+ new

@@ -1,92 +1,50 @@ module CloudCrowd - # The Worker, run at intervals by the Daemon, fetches WorkUnits from the - # central server and dispatches Actions to process them. Workers only fetch - # units that they are able to handle (for which they have an action in their - # actions directory). If communication with the central server is interrupted, - # the WorkUnit will repeatedly attempt to complete its unit -- every - # Worker::RETRY_WAIT seconds. Any exceptions that take place during + # The Worker, forked off from the Node when a new WorkUnit is received, + # launches an Action for processing. Workers will only ever receive WorkUnits + # that they are able to handle (for which they have a corresponding action in + # their actions directory). If communication with the central server is + # interrupted, the Worker will repeatedly attempt to complete its unit -- + # every Worker::RETRY_WAIT seconds. Any exceptions that take place during # the course of the Action will cause the Worker to mark the WorkUnit as - # having failed. + # having failed. When finished, the Worker's process exits, minimizing the + # potential for memory leaks. class Worker - # The time between worker check-ins with the central server, informing - # it of the current status, and simply that it's still alive. - CHECK_IN_INTERVAL = 60 - # Wait five seconds to retry, after internal communcication errors. RETRY_WAIT = 5 attr_reader :action - # Spinning up a worker will create a new AssetStore with a persistent - # connection to S3. This AssetStore gets passed into each action, for use - # as it is run. - def initialize - @id = $$ - @hostname = Socket.gethostname - @name = "#{@id}@#{@hostname}" - @store = AssetStore.new - @server = CloudCrowd.central_server - @enabled_actions = CloudCrowd.actions.keys - log 'started' + # A new Worker begins processing its WorkUnit straight off. + def initialize(node, work_unit) + @pid = $$ + @node = node + trap_signals + setup_work_unit(work_unit) + run end - # Ask the central server for the first WorkUnit in line. - def fetch_work_unit - keep_trying_to "fetch a new work unit" do - unit_json = @server['/work'].post(base_params) - setup_work_unit(unit_json) - end - end - - # Return output to the central server, marking the current work unit as done. + # Return output to the central server, marking the WorkUnit done. def complete_work_unit(result) keep_trying_to "complete work unit" do - data = completion_params.merge({:status => 'succeeded', :output => result}) - unit_json = @server["/work/#{data[:id]}"].put(data) + data = base_params.merge({:status => 'succeeded', :output => result}) + @node.server["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" - clear_work_unit - setup_work_unit(unit_json) end end - # Mark the current work unit as failed, returning the exception to central. + # Mark the WorkUnit failed, returning the exception to central. def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do - data = completion_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) - unit_json = @server["/work/#{data[:id]}"].put(data) + data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) + @node.server["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" - clear_work_unit - setup_work_unit(unit_json) end end - # Check in with the central server. Let it know the condition of the work - # thread, the action and status we're processing, and our hostname and PID. - def check_in(thread_status) - keep_trying_to "check in with central" do - @server["/worker"].put({ - :name => @name, - :thread_status => thread_status - }) - end - end - - # Inform the central server that this worker is finished. This is the only - # remote method that doesn't retry on connection errors -- if the worker - # can't connect to the central server while it's trying to shutdown, it - # should close, regardless. - def check_out - @server["/worker"].put({ - :name => @name, - :terminated => true - }) - log 'exiting' - end - # We expect and require internal communication between the central server # and the workers to succeed. If it fails for any reason, log it, and then # keep trying the same request. def keep_trying_to(title) begin @@ -98,70 +56,62 @@ sleep RETRY_WAIT retry end end - # Does this Worker have a job to do? - def has_work? - @action_name && @input && @options - end - - # Loggable string of the current work unit. + # Loggable details describing what the Worker is up to. def display_work_unit - "unit ##{@options['work_unit_id']} (#{@action_name})" + "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})" end - # Executes the current work unit, catching all exceptions as failures. + # Executes the WorkUnit by running the Action, catching all exceptions as + # failures. We capture the thread so that we can kill it from the outside, + # when exiting. def run_work_unit - begin - result = nil - @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store) - Dir.chdir(@action.work_directory) do - result = case @status - when PROCESSING then @action.process - when SPLITTING then @action.split - when MERGING then @action.merge - else raise Error::StatusUnspecified, "work units must specify their status" + @worker_thread = Thread.new do + begin + result = nil + @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store) + Dir.chdir(@action.work_directory) do + result = case @status + when PROCESSING then @action.process + when SPLITTING then @action.split + when MERGING then @action.merge + else raise Error::StatusUnspecified, "work units must specify their status" + end end + complete_work_unit({'output' => result}.to_json) + rescue Exception => e + fail_work_unit(e) + ensure + @action.cleanup_work_directory end - complete_work_unit({'output' => result}.to_json) - rescue Exception => e - fail_work_unit(e) end + @worker_thread.join end - # Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested. + # Wraps run_work_unit to benchmark the execution time, if requested. def run return run_work_unit unless @options['benchmark'] status = CloudCrowd.display_status(@status) log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s) end private - # Common parameters to send back to central. - def base_params - @base_params ||= { - :worker_name => @name, - :worker_actions => @enabled_actions.join(',') - } - end - # Common parameters to send back to central upon unit completion, # regardless of success or failure. - def completion_params - base_params.merge({ - :id => @options['work_unit_id'], - :time => Time.now - @start_time - }) + def base_params + { :pid => @pid, + :id => @options['work_unit_id'], + :time => Time.now - @start_time } end - # Extract our instance variables from a WorkUnit's JSON. - def setup_work_unit(unit_json) - return false unless unit_json - unit = JSON.parse(unit_json) + # Extract the Worker's instance variables from a WorkUnit's JSON. + def setup_work_unit(unit) + return false unless unit @start_time = Time.now @action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status'] @options['job_id'] = unit['job_id'] @options['work_unit_id'] = unit['id'] @options['attempts'] ||= unit['attempts'] @@ -169,17 +119,28 @@ return true end # Log a message to the daemon log. Includes PID for identification. def log(message) - puts "Worker ##{@id}: #{message}" unless ENV['RACK_ENV'] == 'test' + puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test' end - # When we're done with a unit, clear out our instance variables to make way - # for the next one. Also, remove all of the unit's temporary storage. - def clear_work_unit - @action.cleanup_work_directory - @action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil + # When signaled to exit, make sure that the Worker shuts down cleanly. + def trap_signals + Signal.trap('INT') { shut_down } + Signal.trap('KILL') { shut_down } + Signal.trap('TERM') { shut_down } + end + + # Force the Worker to quit, even if it's in the middle of processing. + # If it had a checked-out WorkUnit, the Node should have released it on + # the central server already. + def shut_down + if @worker_thread + @worker_thread.kill + @worker_thread.kill! if @worker_thread.alive? + end + Process.exit end end end \ No newline at end of file