lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.0.4 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.0.5
- old
+ new
@@ -1,40 +1,34 @@
module CloudCrowd
# A chunk of work that will be farmed out into many WorkUnits to be processed
- # in parallel by all the active CloudCrowd::Workers. Jobs are defined by a list
+ # in parallel by each active CloudCrowd::Worker. Jobs are defined by a list
# of inputs (usually public urls to files), an action (the name of a script that
# CloudCrowd knows how to run), and, eventually a corresponding list of output.
class Job < ActiveRecord::Base
include ModelStatus
has_many :work_units, :dependent => :destroy
validates_presence_of :status, :inputs, :action, :options
+
+ before_validation_on_create :set_initial_status
+ after_create :queue_for_workers
+ before_destroy :cleanup
# Create a Job from an incoming JSON or XML request, and add it to the queue.
- # TODO: Add XML support.
+ # TODO: Think about XML support.
def self.create_from_request(h)
self.create(
:inputs => h['inputs'].to_json,
:action => h['action'],
:options => (h['options'] || {}).to_json,
:owner_email => h['owner_email'],
:callback_url => h['callback_url']
)
end
- # Creating a job creates its corresponding work units, adding them
- # to the queue.
- def after_create
- self.queue_for_workers(JSON.parse(self.inputs))
- end
-
- def before_validation_on_create
- self.status = self.splittable? ? SPLITTING : PROCESSING
- end
-
# After work units are marked successful, we check to see if all of them have
# finished, if so, continue on to the next phase of the job.
def check_for_completion
return unless all_work_units_complete?
transition_to_next_phase
@@ -52,34 +46,33 @@
else fire_callback
end
self
end
- # Transition this Job's status to the following one.
- def transition_to_next_phase
- self.status = any_work_units_failed? ? FAILED :
- self.splitting? ? PROCESSING :
- self.mergeable? ? MERGING :
- SUCCEEDED
- end
-
- # If a callback_url is defined, post the Job's JSON to it upon completion.
+ # If a <tt>callback_url</tt> is defined, post the Job's JSON to it upon
+ # completion. The <tt>callback_url</tt> may include HTTP basic authentication,
+ # if you like:
+ # http://user:password@example.com/job_complete
def fire_callback
begin
RestClient.post(callback_url, {:job => self.to_json}) if callback_url
rescue RestClient::Exception => e
puts "Failed to fire job callback. Hmmm, what should happen here?"
end
end
- # Cleaning up after a job will remove all of its files from S3.
+ # Cleaning up after a job will remove all of its files from S3. Destroying
+ # a Job calls cleanup first.
def cleanup
AssetStore.new.cleanup_job(self)
end
- # Have all of the WorkUnits finished? We could trade reads for writes here
+ # Have all of the WorkUnits finished?
+ #--
+ # We could trade reads for writes here
# by keeping a completed_count on the Job itself.
+ #++
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
# Have any of the WorkUnits failed?
@@ -95,23 +88,18 @@
# This job is mergeable if its Action has a +merge+ method.
def mergeable?
self.processing? && self.action_class.public_instance_methods.include?('merge')
end
- # Retrieve the class for this Job's Action, loading it if necessary.
+ # Retrieve the class for this Job's Action.
def action_class
- CloudCrowd.actions(self.action)
+ klass = CloudCrowd.actions[self.action]
+ return klass if klass
+ raise ActionNotFound, "no action named: '#{self.action}' could be found"
end
- # When the WorkUnits are all finished, gather all their outputs together
- # before removing them from the database entirely.
- def gather_outputs_from_work_units
- outs = self.work_units.complete.map {|wu| wu.output }
- self.work_units.complete.destroy_all
- outs
- end
-
+ # Get the displayable status name of the Job's status code.
def display_status
CloudCrowd.display_status(self.status)
end
# How complete is this Job?
@@ -120,24 +108,73 @@
return 100 if complete?
return 99 if merging?
(work_units.complete.count / work_units.count.to_f * 100).round
end
+ # How long has this Job taken?
+ def time_taken
+ return self.time if self.time
+ Time.now - self.created_at
+ end
+
+ # Generate a stable 8-bit Hex color code, based on the Job's id.
+ def color
+ @color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
+ end
+
# A JSON representation of this job includes the statuses of its component
# WorkUnits, as well as any completed outputs.
def to_json(opts={})
- atts = {'id' => self.id, 'status' => self.display_status, 'percent_complete' => self.percent_complete}
+ atts = {
+ 'id' => self.id,
+ 'color' => self.color,
+ 'status' => self.display_status,
+ 'percent_complete' => self.percent_complete,
+ 'work_units' => self.work_units.count,
+ 'time_taken' => self.time_taken
+ }
atts.merge!({'outputs' => JSON.parse(self.outputs)}) if self.outputs
- atts.merge!({'time' => self.time}) if self.time
atts.to_json
end
+
+
+ private
+
+ # When the WorkUnits are all finished, gather all their outputs together
+ # before removing them from the database entirely.
+ def gather_outputs_from_work_units
+ units = self.work_units.complete
+ outs = self.work_units.complete.map {|u| JSON.parse(u.output)['output'] }
+ self.work_units.complete.destroy_all
+ outs
+ end
+
+ # Transition this Job's status to the appropriate next status.
+ def transition_to_next_phase
+ self.status = any_work_units_failed? ? FAILED :
+ self.splitting? ? PROCESSING :
+ self.mergeable? ? MERGING :
+ SUCCEEDED
+ end
# When starting a new job, or moving to a new stage, split up the inputs
- # into WorkUnits, and queue them.
- def queue_for_workers(input)
+ # into WorkUnits, and queue them. Workers will start picking them up right
+ # away.
+ def queue_for_workers(input=nil)
+ input ||= JSON.parse(self.inputs)
[input].flatten.each do |wu_input|
- WorkUnit.create(:job => self, :input => wu_input, :status => self.status)
+ WorkUnit.create(
+ :job => self,
+ :action => self.action,
+ :input => wu_input,
+ :status => self.status
+ )
end
+ end
+
+ # A Job starts out either splitting or processing, depending on its action.
+ def set_initial_status
+ self.status = self.splittable? ? SPLITTING : PROCESSING
end
end
end
\ No newline at end of file