lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.3 vs lib/cloud_crowd/worker.rb in documentcloud-cloud-crowd-0.0.4

- old
+ new

@@ -1,22 +1,19 @@ module CloudCrowd class Worker - - CENTRAL_URL = CloudCrowd.config[:central_server] - RETRY_WAIT = CloudCrowd.config[:worker_retry_wait] - + attr_reader :action # Spinning up a worker will create a new AssetStore with a persistent # connection to S3. This AssetStore gets passed into each action, for use # as it is run. def initialize @id = $$ @hostname = Socket.gethostname - @store = CloudCrowd::AssetStore.new - @server = central_server_resource + @store = AssetStore.new + @server = CloudCrowd.central_server log 'started' end # Ask the central server for a new WorkUnit. def fetch_work_unit @@ -46,18 +43,22 @@ clear_work_unit setup_work_unit(unit_json) end end + # We expect and require internal communication between the central server + # and the workers to succeed. If it fails for any reason, log it, and then + # keep trying the same request. def keep_trying_to(title) begin yield rescue Exception => e - log "failed to #{title} -- retry in #{RETRY_WAIT} seconds" + wait_time = CloudCrowd.config[:worker_retry_wait] + log "failed to #{title} -- retry in #{wait_time} seconds" log e.message log e.backtrace - sleep RETRY_WAIT + sleep wait_time retry end end # Does this Worker have a job to do? @@ -69,13 +70,13 @@ def run_work_unit begin @action = CloudCrowd.actions(@action_name).new @action.configure(@status, @input, @options, @store) result = case @status - when CloudCrowd::PROCESSING then @action.process - when CloudCrowd::SPLITTING then @action.split - when CloudCrowd::MERGING then @action.merge + when PROCESSING then @action.process + when SPLITTING then @action.split + when MERGING then @action.merge else raise "Work units must specify their status." end complete_work_unit(result) rescue Exception => e fail_work_unit(e) @@ -89,17 +90,9 @@ log("ran #{@action_name}/#{status} in " + Benchmark.measure { run_work_unit }.to_s) end private - - # Keep an authenticated (if configured to enable authentication) resource - # for the central server. - def central_server_resource - params = [CENTRAL_URL] - params += [CloudCrowd.config[:login], CloudCrowd.config[:password]] if CloudCrowd.config[:use_http_authentication] - RestClient::Resource.new(*params) - end # Common parameters to send back to central, regardless of success or failure. def completion_params {:id => @options['work_unit_id'], :time => Time.now - @start_time} end \ No newline at end of file