lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.1 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.2
- old
+ new
@@ -15,28 +15,31 @@
# Available WorkUnits are waiting to be distributed to Nodes for processing.
named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
# Reserved WorkUnits have been marked for distribution by a central server process.
named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'}
- # Attempt to send a list of work_units to nodes with available capacity.
+ # Attempt to send a list of WorkUnits to nodes with available capacity.
# A single central server process stops the same WorkUnit from being
# distributed to multiple nodes by reserving it first. The algorithm used
# should be lock-free.
+ #
+ # We loop over the WorkUnits reserved by this process and try to match them
+ # to Nodes that are capable of handling the Action. WorkUnits get removed
+ # from the availability list when they are successfully sent, and Nodes get
+ # removed when they are busy or have the action in question disabled.
def self.distribute_to_nodes
return unless WorkUnit.reserve_available
work_units = WorkUnit.reserved
available_nodes = NodeRecord.available
- until work_units.empty? do
- node = available_nodes.shift
- unit = work_units.first
- break unless node && unit
- next unless node.actions.include? unit.action
- sent = node.send_work_unit(unit)
- if sent
- work_units.shift
- available_nodes.push(node) unless node.busy?
+ while node = available_nodes.shift and unit = work_units.shift do
+ if node.actions.include? unit.action
+ if node.send_work_unit(unit)
+ available_nodes.push(node) unless node.busy?
+ next
+ end
end
+ work_units.push(unit)
end
ensure
WorkUnit.cancel_reservations
end
@@ -104,10 +107,10 @@
:worker_pid => nil,
:attempts => tries,
:output => output,
:time => time_taken
})
- self.job.check_for_completion
+ job && job.check_for_completion
end
# Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.
def try_again
update_attributes({