lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.5.2 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.6.0

- old
+ new

@@ -37,30 +37,41 @@ # WorkUnits get removed from the availability list when they are # successfully sent, and Nodes get removed when they are busy or have the # action in question disabled. def self.distribute_to_nodes reservation = nil - filter = {} loop do + + # Find the available nodes, and determine what actions we're capable + # of running at the moment. + available_nodes = NodeRecord.available + available_actions = available_nodes.map {|node| node.actions }.flatten.uniq + filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})" + + # Reserve a handful of available work units. WorkUnit.cancel_reservations(reservation) if reservation return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter) work_units = WorkUnit.reserved(reservation) - available_nodes = NodeRecord.available - while node = available_nodes.shift and unit = work_units.shift do + + # Round robin through the nodes and units, sending the unit if the node + # is able to process it. + while (node = available_nodes.shift) && (unit = work_units.shift) do if node.actions.include?(unit.action) if node.send_work_unit(unit) available_nodes.push(node) unless node.busy? next end else unit.cancel_reservation end work_units.push(unit) end - if work_units.any? && available_nodes.any? - filter = {:action => available_nodes.map {|node| node.actions }.flatten.uniq } - next - end + + # If there are both units and nodes left over, try again. + next if work_units.any? && available_nodes.any? + + # If we still have units at this point, or we're fresh out of nodes, + # that means we're done. return if work_units.any? || available_nodes.empty? end ensure WorkUnit.cancel_reservations(reservation) if reservation end