lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.3 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.4
- old
+ new
@@ -1,22 +1,19 @@
module CloudCrowd
class Worker
-
- CENTRAL_URL = CloudCrowd.config[:central_server]
- RETRY_WAIT = CloudCrowd.config[:worker_retry_wait]
-
+
attr_reader :action
# Spinning up a worker will create a new AssetStore with a persistent
# connection to S3. This AssetStore gets passed into each action, for use
# as it is run.
def initialize
@id = $$
@hostname = Socket.gethostname
- @store = CloudCrowd::AssetStore.new
- @server = central_server_resource
+ @store = AssetStore.new
+ @server = CloudCrowd.central_server
log 'started'
end
# Ask the central server for a new WorkUnit.
def fetch_work_unit
@@ -46,18 +43,22 @@
clear_work_unit
setup_work_unit(unit_json)
end
end
+ # We expect and require internal communication between the central server
+ # and the workers to succeed. If it fails for any reason, log it, and then
+ # keep trying the same request.
def keep_trying_to(title)
begin
yield
rescue Exception => e
- log "failed to #{title} -- retry in #{RETRY_WAIT} seconds"
+ wait_time = CloudCrowd.config[:worker_retry_wait]
+ log "failed to #{title} -- retry in #{wait_time} seconds"
log e.message
log e.backtrace
- sleep RETRY_WAIT
+ sleep wait_time
retry
end
end
# Does this Worker have a job to do?
@@ -69,13 +70,13 @@
def run_work_unit
begin
@action = CloudCrowd.actions(@action_name).new
@action.configure(@status, @input, @options, @store)
result = case @status
- when CloudCrowd::PROCESSING then @action.process
- when CloudCrowd::SPLITTING then @action.split
- when CloudCrowd::MERGING then @action.merge
+ when PROCESSING then @action.process
+ when SPLITTING then @action.split
+ when MERGING then @action.merge
else raise "Work units must specify their status."
end
complete_work_unit(result)
rescue Exception => e
fail_work_unit(e)
@@ -89,17 +90,9 @@
log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s)
end
private
-
- # Keep an authenticated (if configured to enable authentication) resource
- # for the central server.
- def central_server_resource
- params = [CENTRAL_URL]
- params += [CloudCrowd.config[:login], CloudCrowd.config[:password]] if CloudCrowd.config[:use_http_authentication]
- RestClient::Resource.new(*params)
- end
# Common parameters to send back to central, regardless of success or failure.
def completion_params
{:id => @options['work_unit_id'], :time => Time.now - @start_time}
end
\ No newline at end of file