lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.1 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.2.0
- old
+ new
@@ -31,16 +31,14 @@
# finished, if so, continue on to the next phase of the job.
def check_for_completion
return unless all_work_units_complete?
set_next_status
outs = gather_outputs_from_work_units
- update_attributes(:outputs => outs.to_json, :time => time_taken) if complete?
-
- case self.status
- when PROCESSING then queue_for_workers(outs.map {|o| JSON.parse(o) }.flatten)
- when MERGING then queue_for_workers(outs.to_json)
- else fire_callback
+ return queue_for_workers(outs) if merging?
+ if complete?
+ update_attributes(:outputs => outs, :time => time_taken)
+ fire_callback if callback_url
end
self
end
# Transition this Job's status to the appropriate next status.
@@ -58,11 +56,10 @@
# if you like:
# http://user:password@example.com/job_complete
# If the callback_url is successfully pinged, we proceed to cleanup the job.
# TODO: This should be moved into a Work Unit...
def fire_callback
- return unless callback_url
begin
RestClient.post(callback_url, {:job => self.to_json})
self.destroy
rescue RestClient::Exception => e
puts "Failed to fire job callback. Hmmm, what should happen here?"
@@ -89,25 +86,32 @@
# This job is splittable if its Action has a +split+ method.
def splittable?
self.action_class.public_instance_methods.include? 'split'
end
+ # This job is done splitting if it's finished with its splitting work units.
+ def done_splitting?
+ splittable? && work_units.splitting.count <= 0
+ end
+
# This job is mergeable if its Action has a +merge+ method.
def mergeable?
self.processing? && self.action_class.public_instance_methods.include?('merge')
end
# Retrieve the class for this Job's Action.
def action_class
- klass = CloudCrowd.actions[self.action]
- return klass if klass
+ @action_class ||= CloudCrowd.actions[self.action]
+ return @action_class if @action_class
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
# How complete is this Job?
# Unfortunately, with the current processing sequence, the percent_complete
- # can pull a fast one and go backwards.
+ # can pull a fast one and go backwards. This happens when there's a single
+ # large input that takes a long time to split, and when it finally does it
+ # creates a whole swarm of work units. This seems unavoidable.
def percent_complete
return 99 if merging?
return 100 if complete?
(work_units.complete.count / work_units.count.to_f * 100).round
end
@@ -141,30 +145,24 @@
private
# When the WorkUnits are all finished, gather all their outputs together
- # before removing them from the database entirely.
+ # before removing them from the database entirely. Returns their merged JSON.
def gather_outputs_from_work_units
units = self.work_units.complete
- outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] }
+ outs = self.work_units.complete.map {|u| u.parsed_output }
self.work_units.complete.destroy_all
- outs
+ outs.to_json
end
# When starting a new job, or moving to a new stage, split up the inputs
# into WorkUnits, and queue them. Workers will start picking them up right
# away.
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
- [input].flatten.map do |wu_input|
- WorkUnit.create(
- :job => self,
- :action => self.action,
- :input => wu_input,
- :status => self.status
- )
- end
+ [input].flatten.each {|i| WorkUnit.start(self, action, i, status) }
+ self
end
# A Job starts out either splitting or processing, depending on its action.
def set_initial_status
self.status = self.splittable? ? SPLITTING : PROCESSING
\ No newline at end of file