module CloudCrowd
# A chunk of work that will be farmed out into many WorkUnits to be processed
# in parallel by each active CloudCrowd::Worker. Jobs are defined by a list
# of inputs (usually public urls to files), an action (the name of a script that
# CloudCrowd knows how to run), and, eventually a corresponding list of output.
class Job < ActiveRecord::Base
include ModelStatus
CLEANUP_GRACE_PERIOD = 7 # That's a week.
has_many :work_units, :dependent => :destroy
validates_presence_of :status, :inputs, :action, :options
# Set initial status
# A Job starts out either splitting or processing, depending on its action.
before_validation(:on => :create) do
self.status = self.splittable? ? SPLITTING : PROCESSING
end
after_create :queue_for_workers
before_destroy :cleanup_assets
# Jobs that were last updated more than N days ago.
scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} }
# Create a Job from an incoming JSON request, and add it to the queue.
def self.create_from_request(h)
self.create(
:inputs => h['inputs'].to_json,
:action => h['action'],
:options => (h['options'] || {}).to_json,
:email => h['email'],
:callback_url => h['callback_url']
)
end
# Clean up all jobs beyond a certain age.
def self.cleanup_all(opts = {})
days = opts[:days] || CLEANUP_GRACE_PERIOD
self.complete.older_than(days).find_in_batches(:batch_size => 100) do |jobs|
jobs.each {|job| job.destroy }
end
end
# After work units are marked successful, we check to see if all of them have
# finished, if so, continue on to the next phase of the job.
def check_for_completion
return unless all_work_units_complete?
set_next_status
outs = gather_outputs_from_work_units
return queue_for_workers([outs]) if merging?
if complete?
update_attributes(:outputs => outs, :time => time_taken)
puts "Job ##{id} (#{action}) #{display_status}." unless ENV['RACK_ENV'] == 'test'
Thread.new { fire_callback } if callback_url
end
self
end
# Transition this Job's current status to the appropriate next one, based
# on the state of the WorkUnits and the nature of the Action.
def set_next_status
update_attribute(:status,
any_work_units_failed? ? FAILED :
self.splitting? ? PROCESSING :
self.mergeable? ? MERGING :
SUCCEEDED
)
end
# If a callback_url is defined, post the Job's JSON to it upon
# completion. The callback_url may include HTTP basic authentication,
# if you like:
# http://user:password@example.com/job_complete
# If the callback URL returns a '201 Created' HTTP status code, CloudCrowd
# will assume that the resource has been successfully created, and the Job
# will be cleaned up.
def fire_callback
begin
response = RestClient.post(callback_url, {:job => self.to_json})
Thread.new { self.destroy } if response && response.code == 201
rescue RestClient::Exception => e
puts "Job ##{id} (#{action}) failed to fire callback: #{callback_url}"
end
end
# Cleaning up after a job will remove all of its files from S3 or the
# filesystem. Destroying a Job will cleanup_assets first. Run this in a
# separate thread to get out of the transaction's way.
# TODO: Convert this into a 'cleanup' work unit that gets run by a worker.
def cleanup_assets
# AssetStore.new.cleanup(self)
end
# Have all of the WorkUnits finished?
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
# Have any of the WorkUnits failed?
def any_work_units_failed?
self.work_units.failed.count > 0
end
# This job is splittable if its Action has a +split+ method.
def splittable?
self.action_class.public_instance_methods.map {|m| m.to_sym }.include? :split
end
# This job is done splitting if it's finished with its splitting work units.
def done_splitting?
splittable? && work_units.splitting.count <= 0
end
# This job is mergeable if its Action has a +merge+ method.
def mergeable?
self.processing? && self.action_class.public_instance_methods.map {|m| m.to_sym }.include?(:merge)
end
# Retrieve the class for this Job's Action.
def action_class
@action_class ||= CloudCrowd.actions[self.action]
return @action_class if @action_class
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
# How complete is this Job?
# Unfortunately, with the current processing sequence, the percent_complete
# can pull a fast one and go backwards. This happens when there's a single
# large input that takes a long time to split, and when it finally does it
# creates a whole swarm of work units. This seems unavoidable.
def percent_complete
return 99 if merging?
return 100 if complete?
unit_count = work_units.count
return 100 if unit_count <= 0
(work_units.complete.count / unit_count.to_f * 100).round
end
# How long has this Job taken?
def time_taken
return self.time if self.time
Time.now - self.created_at
end
# Generate a stable 8-bit Hex color code, based on the Job's id.
def color
@color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end
# A JSON representation of this job includes the statuses of its component
# WorkUnits, as well as any completed outputs.
def as_json(opts={})
atts = {
'id' => id,
'color' => color,
'status' => display_status,
'percent_complete' => percent_complete,
'work_units' => work_units.count,
'time_taken' => time_taken
}
atts['outputs'] = JSON.parse(outputs) if outputs
atts['email'] = email if email
atts
end
private
# When the WorkUnits are all finished, gather all their outputs together
# before removing them from the database entirely. Returns their merged JSON.
def gather_outputs_from_work_units
units = self.work_units.complete
outs = self.work_units.complete.map {|u| u.parsed_output }
self.work_units.complete.destroy_all
outs.to_json
end
# When starting a new job, or moving to a new stage, split up the inputs
# into WorkUnits, and queue them. Workers will start picking them up right
# away.
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
input.each {|i| WorkUnit.start(self, action, i, status) }
self
end
end
end