module CloudCrowd # A chunk of work that will be farmed out into many WorkUnits to be processed # in parallel by each active CloudCrowd::Worker. Jobs are defined by a list # of inputs (usually public urls to files), an action (the name of a script that # CloudCrowd knows how to run), and, eventually a corresponding list of output. class Job < ActiveRecord::Base include ModelStatus has_many :work_units, :dependent => :destroy validates_presence_of :status, :inputs, :action, :options before_validation_on_create :set_initial_status after_create :queue_for_workers before_destroy :cleanup_assets # Create a Job from an incoming JSON or XML request, and add it to the queue. # TODO: Think about XML support. def self.create_from_request(h) self.create( :inputs => h['inputs'].to_json, :action => h['action'], :options => (h['options'] || {}).to_json, :email => h['email'], :callback_url => h['callback_url'] ) end # After work units are marked successful, we check to see if all of them have # finished, if so, continue on to the next phase of the job. def check_for_completion return unless all_work_units_complete? set_next_status outs = gather_outputs_from_work_units update_attributes(:outputs => outs.to_json, :time => time_taken) if complete? case self.status when PROCESSING then queue_for_workers(outs.map {|o| JSON.parse(o) }.flatten) when MERGING then queue_for_workers(outs.to_json) else fire_callback end self end # Transition this Job's status to the appropriate next status. def set_next_status update_attribute(:status, any_work_units_failed? ? FAILED : self.splitting? ? PROCESSING : self.mergeable? ? MERGING : SUCCEEDED ) end # If a callback_url is defined, post the Job's JSON to it upon # completion. The callback_url may include HTTP basic authentication, # if you like: # http://user:password@example.com/job_complete # If the callback_url is successfully pinged, we proceed to cleanup the job. # TODO: This should be moved into a Work Unit... def fire_callback return unless callback_url begin RestClient.post(callback_url, {:job => self.to_json}) self.destroy rescue RestClient::Exception => e puts "Failed to fire job callback. Hmmm, what should happen here?" end end # Cleaning up after a job will remove all of its files from S3. Destroying # a Job calls cleanup_assets first. # TODO: Convert this into a 'cleanup' work unit that gets run by a worker. def cleanup_assets AssetStore.new.cleanup(self) end # Have all of the WorkUnits finished? def all_work_units_complete? self.work_units.incomplete.count <= 0 end # Have any of the WorkUnits failed? def any_work_units_failed? self.work_units.failed.count > 0 end # This job is splittable if its Action has a +split+ method. def splittable? self.action_class.public_instance_methods.include? 'split' end # This job is mergeable if its Action has a +merge+ method. def mergeable? self.processing? && self.action_class.public_instance_methods.include?('merge') end # Retrieve the class for this Job's Action. def action_class klass = CloudCrowd.actions[self.action] return klass if klass raise Error::ActionNotFound, "no action named: '#{self.action}' could be found" end # How complete is this Job? # Unfortunately, with the current processing sequence, the percent_complete # can pull a fast one and go backwards. def percent_complete return 99 if merging? return 100 if complete? (work_units.complete.count / work_units.count.to_f * 100).round end # How long has this Job taken? def time_taken return self.time if self.time Time.now - self.created_at end # Generate a stable 8-bit Hex color code, based on the Job's id. def color @color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1] end # A JSON representation of this job includes the statuses of its component # WorkUnits, as well as any completed outputs. def to_json(opts={}) atts = { 'id' => id, 'color' => color, 'status' => display_status, 'percent_complete' => percent_complete, 'work_units' => work_units.count, 'time_taken' => time_taken } atts['outputs'] = JSON.parse(outputs) if outputs atts['email'] = email if email atts.to_json end private # When the WorkUnits are all finished, gather all their outputs together # before removing them from the database entirely. def gather_outputs_from_work_units units = self.work_units.complete outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] } self.work_units.complete.destroy_all outs end # When starting a new job, or moving to a new stage, split up the inputs # into WorkUnits, and queue them. Workers will start picking them up right # away. def queue_for_workers(input=nil) input ||= JSON.parse(self.inputs) [input].flatten.map do |wu_input| WorkUnit.create( :job => self, :action => self.action, :input => wu_input, :status => self.status ) end end # A Job starts out either splitting or processing, depending on its action. def set_initial_status self.status = self.splittable? ? SPLITTING : PROCESSING end end end