lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.0.2 vs lib/cloud_crowd/models/job.rb in documentcloud-cloud-crowd-0.0.3
- old
+ new
@@ -1,129 +1,132 @@
-# A chunk of work that will be farmed out into many WorkUnits to be processed
-# in parallel by all the active CloudCrowd::Workers. Jobs are defined by a list
-# of inputs (usually public urls to files), an action (the name of a script that
-# CloudCrowd knows how to run), and, eventually a corresponding list of output.
-class Job < ActiveRecord::Base
- include CloudCrowd::ModelStatus
+module CloudCrowd
- has_many :work_units, :dependent => :destroy
-
- validates_presence_of :status, :inputs, :action, :options
+ # A chunk of work that will be farmed out into many WorkUnits to be processed
+ # in parallel by all the active CloudCrowd::Workers. Jobs are defined by a list
+ # of inputs (usually public urls to files), an action (the name of a script that
+ # CloudCrowd knows how to run), and, eventually a corresponding list of output.
+ class Job < ActiveRecord::Base
+ include CloudCrowd::ModelStatus
- # Create a Job from an incoming JSON or XML request, and add it to the queue.
- # TODO: Add XML support.
- def self.create_from_request(h)
- self.create(
- :inputs => h['inputs'].to_json,
- :action => h['action'],
- :options => (h['options'] || {}).to_json,
- :owner_email => h['owner_email'],
- :callback_url => h['callback_url']
- )
- end
-
- def after_create
- self.queue_for_workers(JSON.parse(self.inputs))
- end
-
- def before_validation_on_create
- self.status = self.splittable? ? CloudCrowd::SPLITTING : CloudCrowd::PROCESSING
- end
-
- # After work units are marked successful, we check to see if all of them have
- # finished, if so, this job is complete.
- def check_for_completion
- return unless all_work_units_complete?
- transition_to_next_phase
- output_list = gather_outputs_from_work_units
+ has_many :work_units, :dependent => :destroy
- if complete?
- self.outputs = output_list.to_json
- self.time = Time.now - self.created_at
+ validates_presence_of :status, :inputs, :action, :options
+
+ # Create a Job from an incoming JSON or XML request, and add it to the queue.
+ # TODO: Add XML support.
+ def self.create_from_request(h)
+ self.create(
+ :inputs => h['inputs'].to_json,
+ :action => h['action'],
+ :options => (h['options'] || {}).to_json,
+ :owner_email => h['owner_email'],
+ :callback_url => h['callback_url']
+ )
end
- self.save
- case self.status
- when CloudCrowd::PROCESSING then queue_for_workers(output_list.map {|o| JSON.parse(o) }.flatten)
- when CloudCrowd::MERGING then queue_for_workers(output_list.to_json)
- else fire_callback
+ def after_create
+ self.queue_for_workers(JSON.parse(self.inputs))
end
- self
- end
-
- # Transition from the current phase to the next one.
- def transition_to_next_phase
- self.status = any_work_units_failed? ? CloudCrowd::FAILED :
- self.splitting? ? CloudCrowd::PROCESSING :
- self.should_merge? ? CloudCrowd::MERGING :
- CloudCrowd::SUCCEEDED
- end
-
- # If a callback_url is defined, post the Job's JSON to it upon completion.
- def fire_callback
- begin
- RestClient.post(callback_url, {:job => self.to_json}) if callback_url
- rescue RestClient::Exception => e
- puts "Failed to fire job callback. Hmmm, what should happen here?"
+
+ def before_validation_on_create
+ self.status = self.splittable? ? CloudCrowd::SPLITTING : CloudCrowd::PROCESSING
end
- end
-
- # Cleaning up after a job will remove all of its files from S3.
- def cleanup
- CloudCrowd::AssetStore.new.cleanup_job(self)
- end
-
- # Have all of the WorkUnits finished? We could trade reads for writes here
- # by keeping a completed_count on the Job itself.
- def all_work_units_complete?
- self.work_units.incomplete.count <= 0
- end
-
- # Have any of the WorkUnits failed?
- def any_work_units_failed?
- self.work_units.failed.count > 0
- end
-
- def splittable?
- self.action_class.new.respond_to? :split
- end
-
- def should_merge?
- self.processing? && self.action_class.new.respond_to?(:merge)
- end
-
- def action_class
- CloudCrowd.actions(self.action)
- end
-
- def gather_outputs_from_work_units
- outs = self.work_units.complete.map {|wu| wu.output }
- self.work_units.complete.destroy_all
- outs
- end
-
- def display_status
- CloudCrowd.display_status(self.status)
- end
-
- def work_units_remaining
- self.work_units.incomplete.count
- end
-
- # A JSON representation of this job includes the statuses of its component
- # WorkUnits, as well as any completed outputs.
- def to_json(opts={})
- atts = {'id' => self.id, 'status' => self.display_status, 'work_units_remaining' => self.work_units_remaining}
- atts.merge!({'outputs' => JSON.parse(self.outputs)}) if self.outputs
- atts.merge!({'time' => self.time}) if self.time
- atts.to_json
- end
+
+ # After work units are marked successful, we check to see if all of them have
+ # finished, if so, this job is complete.
+ def check_for_completion
+ return unless all_work_units_complete?
+ transition_to_next_phase
+ output_list = gather_outputs_from_work_units
- # When starting a new job, or moving to a new stage, split up the inputs
- # into WorkUnits, and queue them.
- def queue_for_workers(input)
- [input].flatten.each do |wu_input|
- WorkUnit.create(:job => self, :input => wu_input, :status => self.status)
+ if complete?
+ self.outputs = output_list.to_json
+ self.time = Time.now - self.created_at
+ end
+ self.save
+
+ case self.status
+ when CloudCrowd::PROCESSING then queue_for_workers(output_list.map {|o| JSON.parse(o) }.flatten)
+ when CloudCrowd::MERGING then queue_for_workers(output_list.to_json)
+ else fire_callback
+ end
+ self
end
+
+ # Transition from the current phase to the next one.
+ def transition_to_next_phase
+ self.status = any_work_units_failed? ? CloudCrowd::FAILED :
+ self.splitting? ? CloudCrowd::PROCESSING :
+ self.should_merge? ? CloudCrowd::MERGING :
+ CloudCrowd::SUCCEEDED
+ end
+
+ # If a callback_url is defined, post the Job's JSON to it upon completion.
+ def fire_callback
+ begin
+ RestClient.post(callback_url, {:job => self.to_json}) if callback_url
+ rescue RestClient::Exception => e
+ puts "Failed to fire job callback. Hmmm, what should happen here?"
+ end
+ end
+
+ # Cleaning up after a job will remove all of its files from S3.
+ def cleanup
+ CloudCrowd::AssetStore.new.cleanup_job(self)
+ end
+
+ # Have all of the WorkUnits finished? We could trade reads for writes here
+ # by keeping a completed_count on the Job itself.
+ def all_work_units_complete?
+ self.work_units.incomplete.count <= 0
+ end
+
+ # Have any of the WorkUnits failed?
+ def any_work_units_failed?
+ self.work_units.failed.count > 0
+ end
+
+ def splittable?
+ self.action_class.new.respond_to? :split
+ end
+
+ def should_merge?
+ self.processing? && self.action_class.new.respond_to?(:merge)
+ end
+
+ def action_class
+ CloudCrowd.actions(self.action)
+ end
+
+ def gather_outputs_from_work_units
+ outs = self.work_units.complete.map {|wu| wu.output }
+ self.work_units.complete.destroy_all
+ outs
+ end
+
+ def display_status
+ CloudCrowd.display_status(self.status)
+ end
+
+ def work_units_remaining
+ self.work_units.incomplete.count
+ end
+
+ # A JSON representation of this job includes the statuses of its component
+ # WorkUnits, as well as any completed outputs.
+ def to_json(opts={})
+ atts = {'id' => self.id, 'status' => self.display_status, 'work_units_remaining' => self.work_units_remaining}
+ atts.merge!({'outputs' => JSON.parse(self.outputs)}) if self.outputs
+ atts.merge!({'time' => self.time}) if self.time
+ atts.to_json
+ end
+
+ # When starting a new job, or moving to a new stage, split up the inputs
+ # into WorkUnits, and queue them.
+ def queue_for_workers(input)
+ [input].flatten.each do |wu_input|
+ WorkUnit.create(:job => self, :input => wu_input, :status => self.status)
+ end
+ end
+
end
-
end
\ No newline at end of file