lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.1.0 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.1.1
- old
+ new
@@ -8,85 +8,54 @@
# Worker::RETRY_WAIT seconds. Any exceptions that take place during
# the course of the Action will cause the Worker to mark the WorkUnit as
# having failed.
class Worker
- # The time between worker check-ins with the central server, informing
- # it of the current status, and simply that it's still alive.
- CHECK_IN_INTERVAL = 60
-
# Wait five seconds to retry, after internal communcication errors.
RETRY_WAIT = 5
attr_reader :action
# Spinning up a worker will create a new AssetStore with a persistent
# connection to S3. This AssetStore gets passed into each action, for use
# as it is run.
- def initialize
- @id = $$
- @hostname = Socket.gethostname
- @name = "#{@id}@#{@hostname}"
- @store = AssetStore.new
- @server = CloudCrowd.central_server
- @enabled_actions = CloudCrowd.actions.keys
- log 'started'
+ def initialize(node, work_unit)
+ Signal.trap('INT') { shut_down }
+ Signal.trap('KILL') { shut_down }
+ Signal.trap('TERM') { shut_down }
+ @pid = $$
+ @node = node
+ setup_work_unit(work_unit)
+ run
end
- # Ask the central server for the first WorkUnit in line.
- def fetch_work_unit
- keep_trying_to "fetch a new work unit" do
- unit_json = @server['/work'].post(base_params)
- setup_work_unit(unit_json)
- end
- end
+ # # Ask the central server for the first WorkUnit in line.
+ # def fetch_work_unit
+ # keep_trying_to "fetch a new work unit" do
+ # unit_json = @server['/work'].post(base_params)
+ # setup_work_unit(unit_json)
+ # end
+ # end
# Return output to the central server, marking the current work unit as done.
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = completion_params.merge({:status => 'succeeded', :output => result})
- unit_json = @server["/work/#{data[:id]}"].put(data)
+ @node.server["/work/#{data[:id]}"].put(data)
log "finished #{display_work_unit} in #{data[:time]} seconds"
- clear_work_unit
- setup_work_unit(unit_json)
end
end
# Mark the current work unit as failed, returning the exception to central.
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = completion_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
- unit_json = @server["/work/#{data[:id]}"].put(data)
+ @node.server["/work/#{data[:id]}"].put(data)
log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
- clear_work_unit
- setup_work_unit(unit_json)
end
end
- # Check in with the central server. Let it know the condition of the work
- # thread, the action and status we're processing, and our hostname and PID.
- def check_in(thread_status)
- keep_trying_to "check in with central" do
- @server["/worker"].put({
- :name => @name,
- :thread_status => thread_status
- })
- end
- end
-
- # Inform the central server that this worker is finished. This is the only
- # remote method that doesn't retry on connection errors -- if the worker
- # can't connect to the central server while it's trying to shutdown, it
- # should close, regardless.
- def check_out
- @server["/worker"].put({
- :name => @name,
- :terminated => true
- })
- log 'exiting'
- end
-
# We expect and require internal communication between the central server
# and the workers to succeed. If it fails for any reason, log it, and then
# keep trying the same request.
def keep_trying_to(title)
begin
@@ -98,37 +67,35 @@
sleep RETRY_WAIT
retry
end
end
- # Does this Worker have a job to do?
- def has_work?
- @action_name && @input && @options
- end
-
# Loggable string of the current work unit.
def display_work_unit
- "unit ##{@options['work_unit_id']} (#{@action_name})"
+ "unit ##{@options['work_unit_id']} (#{@action_name}/#{CloudCrowd.display_status(@status)})"
end
# Executes the current work unit, catching all exceptions as failures.
def run_work_unit
- begin
- result = nil
- @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @store)
- Dir.chdir(@action.work_directory) do
- result = case @status
- when PROCESSING then @action.process
- when SPLITTING then @action.split
- when MERGING then @action.merge
- else raise Error::StatusUnspecified, "work units must specify their status"
+ @worker_thread = Thread.new do
+ begin
+ result = nil
+ @action = CloudCrowd.actions[@action_name].new(@status, @input, @options, @node.asset_store)
+ Dir.chdir(@action.work_directory) do
+ result = case @status
+ when PROCESSING then @action.process
+ when SPLITTING then @action.split
+ when MERGING then @action.merge
+ else raise Error::StatusUnspecified, "work units must specify their status"
+ end
end
+ complete_work_unit({'output' => result}.to_json)
+ rescue Exception => e
+ fail_work_unit(e)
end
- complete_work_unit({'output' => result}.to_json)
- rescue Exception => e
- fail_work_unit(e)
end
+ @worker_thread.join
end
# Wraps <tt>run_work_unit</tt> to benchmark the execution time, if requested.
def run
return run_work_unit unless @options['benchmark']
@@ -140,12 +107,11 @@
private
# Common parameters to send back to central.
def base_params
@base_params ||= {
- :worker_name => @name,
- :worker_actions => @enabled_actions.join(',')
+ :pid => @pid
}
end
# Common parameters to send back to central upon unit completion,
# regardless of success or failure.
@@ -155,13 +121,12 @@
:time => Time.now - @start_time
})
end
# Extract our instance variables from a WorkUnit's JSON.
- def setup_work_unit(unit_json)
- return false unless unit_json
- unit = JSON.parse(unit_json)
+ def setup_work_unit(unit)
+ return false unless unit
@start_time = Time.now
@action_name, @input, @options, @status = unit['action'], unit['input'], unit['options'], unit['status']
@options['job_id'] = unit['job_id']
@options['work_unit_id'] = unit['id']
@options['attempts'] ||= unit['attempts']
@@ -169,17 +134,28 @@
return true
end
# Log a message to the daemon log. Includes PID for identification.
def log(message)
- puts "Worker ##{@id}: #{message}" unless ENV['RACK_ENV'] == 'test'
+ puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
end
# When we're done with a unit, clear out our instance variables to make way
# for the next one. Also, remove all of the unit's temporary storage.
def clear_work_unit
@action.cleanup_work_directory
@action, @action_name, @input, @options, @start_time = nil, nil, nil, nil, nil
+ end
+
+ # Force the worker to quit, even if it's in the middle of processing.
+ # If it had checked out a work unit, the node should have released it on
+ # the central server already.
+ def shut_down
+ if @worker_thread
+ @worker_thread.kill
+ @worker_thread.kill! if @worker_thread.alive?
+ end
+ Process.exit
end
end
end
\ No newline at end of file