lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.1 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.2

- old
+ new

@@ -15,28 +15,31 @@ # Available WorkUnits are waiting to be distributed to Nodes for processing. named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} # Reserved WorkUnits have been marked for distribution by a central server process. named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'} - # Attempt to send a list of work_units to nodes with available capacity. + # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used # should be lock-free. + # + # We loop over the WorkUnits reserved by this process and try to match them + # to Nodes that are capable of handling the Action. WorkUnits get removed + # from the availability list when they are successfully sent, and Nodes get + # removed when they are busy or have the action in question disabled. def self.distribute_to_nodes return unless WorkUnit.reserve_available work_units = WorkUnit.reserved available_nodes = NodeRecord.available - until work_units.empty? do - node = available_nodes.shift - unit = work_units.first - break unless node && unit - next unless node.actions.include? unit.action - sent = node.send_work_unit(unit) - if sent - work_units.shift - available_nodes.push(node) unless node.busy? + while node = available_nodes.shift and unit = work_units.shift do + if node.actions.include? unit.action + if node.send_work_unit(unit) + available_nodes.push(node) unless node.busy? + next + end end + work_units.push(unit) end ensure WorkUnit.cancel_reservations end @@ -104,10 +107,10 @@ :worker_pid => nil, :attempts => tries, :output => output, :time => time_taken }) - self.job.check_for_completion + job && job.check_for_completion end # Ever tried. Ever failed. No matter. Try again. Fail again. Fail better. def try_again update_attributes({