lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.1.0 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.0
- old
+ new
@@ -6,74 +6,128 @@
# are each run as a single WorkUnit.
class WorkUnit < ActiveRecord::Base
include ModelStatus
belongs_to :job
- belongs_to :worker_record
+ belongs_to :node_record
validates_presence_of :job_id, :status, :input, :action
- after_save :check_for_job_completion
+ # Available WorkUnits are waiting to be distributed to Nodes for processing.
+ named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
+ # Reserved WorkUnits have been marked for distribution by a central server process.
+ named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'}
- # Find the first available WorkUnit in the queue, and take it out.
- # +enabled_actions+ must be passed to whitelist the types of WorkUnits than
- # can be retrieved for processing. Optionally, specify the +offset+ to peek
- # further on in line.
- def self.dequeue(worker_name, enabled_actions=[], offset=0)
- unit = self.first(
- :conditions => {:status => INCOMPLETE, :worker_record_id => nil, :action => enabled_actions},
- :order => "created_at asc",
- :offset => offset
- )
- unit ? unit.assign_to(worker_name) : nil
+ # Attempt to send a list of work_units to nodes with available capacity.
+ # A single central server process stops the same WorkUnit from being
+ # distributed to multiple nodes by reserving it first. The algorithm used
+ # should be lock-free.
+ def self.distribute_to_nodes
+ return unless WorkUnit.reserve_available
+ work_units = WorkUnit.reserved
+ available_nodes = NodeRecord.available
+ until work_units.empty? do
+ node = available_nodes.shift
+ unit = work_units.first
+ break unless node
+ next unless node.actions.include? unit.action
+ sent = node.send_work_unit(unit)
+ if sent
+ work_units.shift
+ available_nodes.push(node) unless node.busy?
+ end
+ end
+ ensure
+ WorkUnit.cancel_reservations
end
- # After saving a WorkUnit, its Job should check if it just became complete.
- def check_for_job_completion
- self.job.check_for_completion if complete?
+ # Reserves all available WorkUnits for this process. Returns false if there
+ # were none available.
+ def self.reserve_available
+ WorkUnit.available.update_all("reservation = #{$$}") > 0
end
+ # Cancels all outstanding WorkUnit reservations for this process.
+ def self.cancel_reservations
+ WorkUnit.reserved.update_all('reservation = null')
+ end
+
+ # Look up a WorkUnit by the worker that's currently processing it. Specified
+ # by <tt>pid@host</tt>.
+ def self.find_by_worker_name(name)
+ pid, host = name.split('@')
+ node = NodeRecord.find_by_host(host)
+ node && node.work_units.find_by_worker_pid(pid)
+ end
+
+ # Convenience method for starting a new WorkUnit.
+ def self.start(job, action, input, status)
+ self.create(:job => job, :action => action, :input => input, :status => status)
+ end
+
# Mark this unit as having finished successfully.
- def finish(output, time_taken)
- update_attributes({
- :status => SUCCEEDED,
- :worker_record => nil,
- :attempts => self.attempts + 1,
- :output => output,
- :time => time_taken
- })
+ # Splitting work units are handled differently (an optimization) -- they
+ # immediately fire off all of their resulting WorkUnits for processing,
+ # without waiting for the rest of their splitting cousins to complete.
+ def finish(result, time_taken)
+ if splitting?
+ [JSON.parse(parsed_output(result))].flatten.each do |new_input|
+ WorkUnit.start(job, action, new_input, PROCESSING)
+ end
+ self.destroy
+ job.set_next_status if job.done_splitting?
+ else
+ update_attributes({
+ :status => SUCCEEDED,
+ :node_record => nil,
+ :worker_pid => nil,
+ :attempts => attempts + 1,
+ :output => result,
+ :time => time_taken
+ })
+ job.check_for_completion
+ end
end
# Mark this unit as having failed. May attempt a retry.
def fail(output, time_taken)
tries = self.attempts + 1
return try_again if tries < CloudCrowd.config[:work_unit_retries]
update_attributes({
:status => FAILED,
- :worker_record => nil,
+ :node_record => nil,
+ :worker_pid => nil,
:attempts => tries,
:output => output,
:time => time_taken
})
+ self.job.check_for_completion
end
# Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.
def try_again
update_attributes({
- :worker_record => nil,
- :attempts => self.attempts + 1
+ :node_record => nil,
+ :worker_pid => nil,
+ :attempts => self.attempts + 1
})
end
- # When a Worker checks out a WorkUnit, establish the connection between
- # WorkUnit and WorkerRecord.
- def assign_to(worker_name)
- self.worker_record = WorkerRecord.find_by_name!(worker_name)
- self.save ? self : nil
+ # When a Node checks out a WorkUnit, establish the connection between
+ # WorkUnit and NodeRecord and record the worker_pid.
+ def assign_to(node_record, worker_pid)
+ update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end
+ # All output needs to be wrapped in a JSON object for consistency
+ # (unfortunately, JSON.parse needs the top-level to be an object or array).
+ # Convenience method to provide the parsed version.
+ def parsed_output(out = self.output)
+ JSON.parse(out)['output']
+ end
+
# The JSON representation of a WorkUnit shares the Job's options with all
- # its sister WorkUnits.
+ # its cousin WorkUnits.
def to_json
{
'id' => self.id,
'job_id' => self.job_id,
'input' => self.input,