lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.0 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.1.1

- old
+ new

@@ -29,50 +29,56 @@ # After work units are marked successful, we check to see if all of them have # finished, if so, continue on to the next phase of the job. def check_for_completion return unless all_work_units_complete? - transition_to_next_phase - output_list = gather_outputs_from_work_units + set_next_status + outs = gather_outputs_from_work_units + update_attributes(:outputs => outs.to_json, :time => time_taken) if complete? - if complete? - self.outputs = output_list.to_json - self.time = Time.now - self.created_at - end - self.save - case self.status - when PROCESSING then queue_for_workers(output_list.map {|o| JSON.parse(o) }.flatten) - when MERGING then queue_for_workers(output_list.to_json) + when PROCESSING then queue_for_workers(outs.map {|o| JSON.parse(o) }.flatten) + when MERGING then queue_for_workers(outs.to_json) else fire_callback end self end + # Transition this Job's status to the appropriate next status. + def set_next_status + update_attribute(:status, + any_work_units_failed? ? FAILED : + self.splitting? ? PROCESSING : + self.mergeable? ? MERGING : + SUCCEEDED + ) + end + # If a <tt>callback_url</tt> is defined, post the Job's JSON to it upon # completion. The <tt>callback_url</tt> may include HTTP basic authentication, # if you like: # http://user:password@example.com/job_complete + # If the callback_url is successfully pinged, we proceed to cleanup the job. + # TODO: This should be moved into a Work Unit... def fire_callback + return unless callback_url begin - RestClient.post(callback_url, {:job => self.to_json}) if callback_url + RestClient.post(callback_url, {:job => self.to_json}) + self.destroy rescue RestClient::Exception => e puts "Failed to fire job callback. Hmmm, what should happen here?" end end # Cleaning up after a job will remove all of its files from S3. Destroying # a Job calls cleanup_assets first. + # TODO: Convert this into a 'cleanup' work unit that gets run by a worker. def cleanup_assets AssetStore.new.cleanup(self) end # Have all of the WorkUnits finished? - #-- - # We could trade reads for writes here - # by keeping a completed_count on the Job itself. - #++ def all_work_units_complete? self.work_units.incomplete.count <= 0 end # Have any of the WorkUnits failed? @@ -96,14 +102,15 @@ return klass if klass raise Error::ActionNotFound, "no action named: '#{self.action}' could be found" end # How complete is this Job? + # Unfortunately, with the current processing sequence, the percent_complete + # can pull a fast one and go backwards. def percent_complete - return 0 if splitting? - return 100 if complete? return 99 if merging? + return 100 if complete? (work_units.complete.count / work_units.count.to_f * 100).round end # How long has this Job taken? def time_taken @@ -141,24 +148,16 @@ units = self.work_units.complete outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] } self.work_units.complete.destroy_all outs end - - # Transition this Job's status to the appropriate next status. - def transition_to_next_phase - self.status = any_work_units_failed? ? FAILED : - self.splitting? ? PROCESSING : - self.mergeable? ? MERGING : - SUCCEEDED - end # When starting a new job, or moving to a new stage, split up the inputs # into WorkUnits, and queue them. Workers will start picking them up right # away. def queue_for_workers(input=nil) input ||= JSON.parse(self.inputs) - [input].flatten.each do |wu_input| + [input].flatten.map do |wu_input| WorkUnit.create( :job => self, :action => self.action, :input => wu_input, :status => self.status \ No newline at end of file