lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.1 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.2.0

- old
+ new

@@ -31,16 +31,14 @@ # finished, if so, continue on to the next phase of the job. def check_for_completion return unless all_work_units_complete? set_next_status outs = gather_outputs_from_work_units - update_attributes(:outputs => outs.to_json, :time => time_taken) if complete? - - case self.status - when PROCESSING then queue_for_workers(outs.map {|o| JSON.parse(o) }.flatten) - when MERGING then queue_for_workers(outs.to_json) - else fire_callback + return queue_for_workers(outs) if merging? + if complete? + update_attributes(:outputs => outs, :time => time_taken) + fire_callback if callback_url end self end # Transition this Job's status to the appropriate next status. @@ -58,11 +56,10 @@ # if you like: # http://user:password@example.com/job_complete # If the callback_url is successfully pinged, we proceed to cleanup the job. # TODO: This should be moved into a Work Unit... def fire_callback - return unless callback_url begin RestClient.post(callback_url, {:job => self.to_json}) self.destroy rescue RestClient::Exception => e puts "Failed to fire job callback. Hmmm, what should happen here?" @@ -89,25 +86,32 @@ # This job is splittable if its Action has a +split+ method. def splittable? self.action_class.public_instance_methods.include? 'split' end + # This job is done splitting if it's finished with its splitting work units. + def done_splitting? + splittable? && work_units.splitting.count <= 0 + end + # This job is mergeable if its Action has a +merge+ method. def mergeable? self.processing? && self.action_class.public_instance_methods.include?('merge') end # Retrieve the class for this Job's Action. def action_class - klass = CloudCrowd.actions[self.action] - return klass if klass + @action_class ||= CloudCrowd.actions[self.action] + return @action_class if @action_class raise Error::ActionNotFound, "no action named: '#{self.action}' could be found" end # How complete is this Job? # Unfortunately, with the current processing sequence, the percent_complete - # can pull a fast one and go backwards. + # can pull a fast one and go backwards. This happens when there's a single + # large input that takes a long time to split, and when it finally does it + # creates a whole swarm of work units. This seems unavoidable. def percent_complete return 99 if merging? return 100 if complete? (work_units.complete.count / work_units.count.to_f * 100).round end @@ -141,30 +145,24 @@ private # When the WorkUnits are all finished, gather all their outputs together - # before removing them from the database entirely. + # before removing them from the database entirely. Returns their merged JSON. def gather_outputs_from_work_units units = self.work_units.complete - outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] } + outs = self.work_units.complete.map {|u| u.parsed_output } self.work_units.complete.destroy_all - outs + outs.to_json end # When starting a new job, or moving to a new stage, split up the inputs # into WorkUnits, and queue them. Workers will start picking them up right # away. def queue_for_workers(input=nil) input ||= JSON.parse(self.inputs) - [input].flatten.map do |wu_input| - WorkUnit.create( - :job => self, - :action => self.action, - :input => wu_input, - :status => self.status - ) - end + [input].flatten.each {|i| WorkUnit.start(self, action, i, status) } + self end # A Job starts out either splitting or processing, depending on its action. def set_initial_status self.status = self.splittable? ? SPLITTING : PROCESSING \ No newline at end of file