lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.3.3 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.4.0

- old
+ new

@@ -4,98 +4,100 @@ # through a single action. The WorkUnits are run in parallel, with each worker # daemon processing one at a time. The splitting and merging stages of a job # are each run as a single WorkUnit. class WorkUnit < ActiveRecord::Base include ModelStatus - + # We use a random number in (0...MAX_RESERVATION) to reserve work units. # The size of the maximum signed integer in MySQL -- SQLite has no limit. MAX_RESERVATION = 2147483647 - + # We only reserve a certain number of WorkUnits in a single go, to avoid # reserving the entire table. RESERVATION_LIMIT = 25 - + belongs_to :job belongs_to :node_record - + validates_presence_of :job_id, :status, :input, :action - + # Available WorkUnits are waiting to be distributed to Nodes for processing. named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} # Reserved WorkUnits have been marked for distribution by a central server process. - named_scope :reserved, lambda {|reservation| + named_scope :reserved, lambda {|reservation| {:conditions => {:reservation => reservation}, :order => 'updated_at asc'} } - + # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used # should be lock-free. # # We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, - # and try to match them to Nodes that are capable of handling the Action. - # WorkUnits get removed from the availability list when they are - # successfully sent, and Nodes get removed when they are busy or have the + # and try to match them to Nodes that are capable of handling the Action. + # WorkUnits get removed from the availability list when they are + # successfully sent, and Nodes get removed when they are busy or have the # action in question disabled. def self.distribute_to_nodes reservation = nil loop do return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT) work_units = WorkUnit.reserved(reservation) available_nodes = NodeRecord.available while node = available_nodes.shift and unit = work_units.shift do - if node.actions.include? unit.action + if node.actions.include?(unit.action) if node.send_work_unit(unit) available_nodes.push(node) unless node.busy? next end + else + unit.cancel_reservation end work_units.push(unit) end return if work_units.any? || available_nodes.empty? end ensure WorkUnit.cancel_reservations(reservation) if reservation end - - # Reserves all available WorkUnits for this process. Returns false if there + + # Reserves all available WorkUnits for this process. Returns false if there # were none available. def self.reserve_available(options={}) reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) any = WorkUnit.available.update_all("reservation = #{reservation}", nil, options) > 0 any && reservation end - + # Cancels all outstanding WorkUnit reservations for this process. def self.cancel_reservations(reservation) WorkUnit.reserved(reservation).update_all('reservation = null') end - + # Cancels all outstanding WorkUnit reservations for all processes. (Useful # in the console for debugging.) def self.cancel_all_reservations WorkUnit.update_all('reservation = null') end - + # Look up a WorkUnit by the worker that's currently processing it. Specified # by <tt>pid@host</tt>. def self.find_by_worker_name(name) pid, host = name.split('@') node = NodeRecord.find_by_host(host) node && node.work_units.find_by_worker_pid(pid) end - + # Convenience method for starting a new WorkUnit. def self.start(job, action, input, status) input = input.to_json unless input.is_a? String self.create(:job => job, :action => action, :input => input, :status => status) end - + # Mark this unit as having finished successfully. - # Splitting work units are handled differently (an optimization) -- they - # immediately fire off all of their resulting WorkUnits for processing, + # Splitting work units are handled differently (an optimization) -- they + # immediately fire off all of their resulting WorkUnits for processing, # without waiting for the rest of their splitting cousins to complete. def finish(result, time_taken) if splitting? [parsed_output(result)].flatten.each do |new_input| WorkUnit.start(job, action, new_input, PROCESSING) @@ -112,11 +114,11 @@ :time => time_taken }) job && job.check_for_completion end end - + # Mark this unit as having failed. May attempt a retry. def fail(output, time_taken) tries = self.attempts + 1 return try_again if tries < CloudCrowd.config[:work_unit_retries] update_attributes({ @@ -127,33 +129,38 @@ :output => output, :time => time_taken }) job && job.check_for_completion end - + # Ever tried. Ever failed. No matter. Try again. Fail again. Fail better. def try_again update_attributes({ :node_record => nil, :worker_pid => nil, :attempts => self.attempts + 1 }) end - + + # If the node can't process the unit, cancel it's reservation. + def cancel_reservation + update_attributes!(:reservation => nil) + end + # When a Node checks out a WorkUnit, establish the connection between # WorkUnit and NodeRecord and record the worker_pid. def assign_to(node_record, worker_pid) update_attributes!(:node_record => node_record, :worker_pid => worker_pid) end - - # All output needs to be wrapped in a JSON object for consistency - # (unfortunately, JSON.parse needs the top-level to be an object or array). + + # All output needs to be wrapped in a JSON object for consistency + # (unfortunately, JSON.parse needs the top-level to be an object or array). # Convenience method to provide the parsed version. def parsed_output(out = self.output) JSON.parse(out)['output'] end - + # The JSON representation of a WorkUnit shares the Job's options with all # its cousin WorkUnits. def to_json { 'id' => self.id, @@ -163,8 +170,8 @@ 'action' => self.action, 'options' => JSON.parse(self.job.options), 'status' => self.status }.to_json end - + end end