lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.1.0 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.0

- old
+ new

@@ -6,74 +6,128 @@ # are each run as a single WorkUnit. class WorkUnit < ActiveRecord::Base include ModelStatus belongs_to :job - belongs_to :worker_record + belongs_to :node_record validates_presence_of :job_id, :status, :input, :action - after_save :check_for_job_completion + # Available WorkUnits are waiting to be distributed to Nodes for processing. + named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} + # Reserved WorkUnits have been marked for distribution by a central server process. + named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'} - # Find the first available WorkUnit in the queue, and take it out. - # +enabled_actions+ must be passed to whitelist the types of WorkUnits than - # can be retrieved for processing. Optionally, specify the +offset+ to peek - # further on in line. - def self.dequeue(worker_name, enabled_actions=[], offset=0) - unit = self.first( - :conditions => {:status => INCOMPLETE, :worker_record_id => nil, :action => enabled_actions}, - :order => "created_at asc", - :offset => offset - ) - unit ? unit.assign_to(worker_name) : nil + # Attempt to send a list of work_units to nodes with available capacity. + # A single central server process stops the same WorkUnit from being + # distributed to multiple nodes by reserving it first. The algorithm used + # should be lock-free. + def self.distribute_to_nodes + return unless WorkUnit.reserve_available + work_units = WorkUnit.reserved + available_nodes = NodeRecord.available + until work_units.empty? do + node = available_nodes.shift + unit = work_units.first + break unless node + next unless node.actions.include? unit.action + sent = node.send_work_unit(unit) + if sent + work_units.shift + available_nodes.push(node) unless node.busy? + end + end + ensure + WorkUnit.cancel_reservations end - # After saving a WorkUnit, its Job should check if it just became complete. - def check_for_job_completion - self.job.check_for_completion if complete? + # Reserves all available WorkUnits for this process. Returns false if there + # were none available. + def self.reserve_available + WorkUnit.available.update_all("reservation = #{$$}") > 0 end + # Cancels all outstanding WorkUnit reservations for this process. + def self.cancel_reservations + WorkUnit.reserved.update_all('reservation = null') + end + + # Look up a WorkUnit by the worker that's currently processing it. Specified + # by <tt>pid@host</tt>. + def self.find_by_worker_name(name) + pid, host = name.split('@') + node = NodeRecord.find_by_host(host) + node && node.work_units.find_by_worker_pid(pid) + end + + # Convenience method for starting a new WorkUnit. + def self.start(job, action, input, status) + self.create(:job => job, :action => action, :input => input, :status => status) + end + # Mark this unit as having finished successfully. - def finish(output, time_taken) - update_attributes({ - :status => SUCCEEDED, - :worker_record => nil, - :attempts => self.attempts + 1, - :output => output, - :time => time_taken - }) + # Splitting work units are handled differently (an optimization) -- they + # immediately fire off all of their resulting WorkUnits for processing, + # without waiting for the rest of their splitting cousins to complete. + def finish(result, time_taken) + if splitting? + [JSON.parse(parsed_output(result))].flatten.each do |new_input| + WorkUnit.start(job, action, new_input, PROCESSING) + end + self.destroy + job.set_next_status if job.done_splitting? + else + update_attributes({ + :status => SUCCEEDED, + :node_record => nil, + :worker_pid => nil, + :attempts => attempts + 1, + :output => result, + :time => time_taken + }) + job.check_for_completion + end end # Mark this unit as having failed. May attempt a retry. def fail(output, time_taken) tries = self.attempts + 1 return try_again if tries < CloudCrowd.config[:work_unit_retries] update_attributes({ :status => FAILED, - :worker_record => nil, + :node_record => nil, + :worker_pid => nil, :attempts => tries, :output => output, :time => time_taken }) + self.job.check_for_completion end # Ever tried. Ever failed. No matter. Try again. Fail again. Fail better. def try_again update_attributes({ - :worker_record => nil, - :attempts => self.attempts + 1 + :node_record => nil, + :worker_pid => nil, + :attempts => self.attempts + 1 }) end - # When a Worker checks out a WorkUnit, establish the connection between - # WorkUnit and WorkerRecord. - def assign_to(worker_name) - self.worker_record = WorkerRecord.find_by_name!(worker_name) - self.save ? self : nil + # When a Node checks out a WorkUnit, establish the connection between + # WorkUnit and NodeRecord and record the worker_pid. + def assign_to(node_record, worker_pid) + update_attributes!(:node_record => node_record, :worker_pid => worker_pid) end + # All output needs to be wrapped in a JSON object for consistency + # (unfortunately, JSON.parse needs the top-level to be an object or array). + # Convenience method to provide the parsed version. + def parsed_output(out = self.output) + JSON.parse(out)['output'] + end + # The JSON representation of a WorkUnit shares the Job's options with all - # its sister WorkUnits. + # its cousin WorkUnits. def to_json { 'id' => self.id, 'job_id' => self.job_id, 'input' => self.input,