lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.5.2 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.6.0
- old
+ new
@@ -37,30 +37,41 @@
# WorkUnits get removed from the availability list when they are
# successfully sent, and Nodes get removed when they are busy or have the
# action in question disabled.
def self.distribute_to_nodes
reservation = nil
- filter = {}
loop do
+
+ # Find the available nodes, and determine what actions we're capable
+ # of running at the moment.
+ available_nodes = NodeRecord.available
+ available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
+ filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
+
+ # Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)
- available_nodes = NodeRecord.available
- while node = available_nodes.shift and unit = work_units.shift do
+
+ # Round robin through the nodes and units, sending the unit if the node
+ # is able to process it.
+ while (node = available_nodes.shift) && (unit = work_units.shift) do
if node.actions.include?(unit.action)
if node.send_work_unit(unit)
available_nodes.push(node) unless node.busy?
next
end
else
unit.cancel_reservation
end
work_units.push(unit)
end
- if work_units.any? && available_nodes.any?
- filter = {:action => available_nodes.map {|node| node.actions }.flatten.uniq }
- next
- end
+
+ # If there are both units and nodes left over, try again.
+ next if work_units.any? && available_nodes.any?
+
+ # If we still have units at this point, or we're fresh out of nodes,
+ # that means we're done.
return if work_units.any? || available_nodes.empty?
end
ensure
WorkUnit.cancel_reservations(reservation) if reservation
end