lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.0 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.1
- old
+ new
@@ -29,50 +29,56 @@
# After work units are marked successful, we check to see if all of them have
# finished, if so, continue on to the next phase of the job.
def check_for_completion
return unless all_work_units_complete?
- transition_to_next_phase
- output_list = gather_outputs_from_work_units
+ set_next_status
+ outs = gather_outputs_from_work_units
+ update_attributes(:outputs => outs.to_json, :time => time_taken) if complete?
- if complete?
- self.outputs = output_list.to_json
- self.time = Time.now - self.created_at
- end
- self.save
-
case self.status
- when PROCESSING then queue_for_workers(output_list.map {|o| JSON.parse(o) }.flatten)
- when MERGING then queue_for_workers(output_list.to_json)
+ when PROCESSING then queue_for_workers(outs.map {|o| JSON.parse(o) }.flatten)
+ when MERGING then queue_for_workers(outs.to_json)
else fire_callback
end
self
end
+ # Transition this Job's status to the appropriate next status.
+ def set_next_status
+ update_attribute(:status,
+ any_work_units_failed? ? FAILED :
+ self.splitting? ? PROCESSING :
+ self.mergeable? ? MERGING :
+ SUCCEEDED
+ )
+ end
+
# If a <tt>callback_url</tt> is defined, post the Job's JSON to it upon
# completion. The <tt>callback_url</tt> may include HTTP basic authentication,
# if you like:
# http://user:password@example.com/job_complete
+ # If the callback_url is successfully pinged, we proceed to cleanup the job.
+ # TODO: This should be moved into a Work Unit...
def fire_callback
+ return unless callback_url
begin
- RestClient.post(callback_url, {:job => self.to_json}) if callback_url
+ RestClient.post(callback_url, {:job => self.to_json})
+ self.destroy
rescue RestClient::Exception => e
puts "Failed to fire job callback. Hmmm, what should happen here?"
end
end
# Cleaning up after a job will remove all of its files from S3. Destroying
# a Job calls cleanup_assets first.
+ # TODO: Convert this into a 'cleanup' work unit that gets run by a worker.
def cleanup_assets
AssetStore.new.cleanup(self)
end
# Have all of the WorkUnits finished?
- #--
- # We could trade reads for writes here
- # by keeping a completed_count on the Job itself.
- #++
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
# Have any of the WorkUnits failed?
@@ -96,14 +102,15 @@
return klass if klass
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
# How complete is this Job?
+ # Unfortunately, with the current processing sequence, the percent_complete
+ # can pull a fast one and go backwards.
def percent_complete
- return 0 if splitting?
- return 100 if complete?
return 99 if merging?
+ return 100 if complete?
(work_units.complete.count / work_units.count.to_f * 100).round
end
# How long has this Job taken?
def time_taken
@@ -141,24 +148,16 @@
units = self.work_units.complete
outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] }
self.work_units.complete.destroy_all
outs
end
-
- # Transition this Job's status to the appropriate next status.
- def transition_to_next_phase
- self.status = any_work_units_failed? ? FAILED :
- self.splitting? ? PROCESSING :
- self.mergeable? ? MERGING :
- SUCCEEDED
- end
# When starting a new job, or moving to a new stage, split up the inputs
# into WorkUnits, and queue them. Workers will start picking them up right
# away.
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
- [input].flatten.each do |wu_input|
+ [input].flatten.map do |wu_input|
WorkUnit.create(
:job => self,
:action => self.action,
:input => wu_input,
:status => self.status
\ No newline at end of file