lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.0 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.1
- old
+ new
@@ -26,11 +26,11 @@
work_units = WorkUnit.reserved
available_nodes = NodeRecord.available
until work_units.empty? do
node = available_nodes.shift
unit = work_units.first
- break unless node
+ break unless node && unit
next unless node.actions.include? unit.action
sent = node.send_work_unit(unit)
if sent
work_units.shift
available_nodes.push(node) unless node.busy?
@@ -49,10 +49,16 @@
# Cancels all outstanding WorkUnit reservations for this process.
def self.cancel_reservations
WorkUnit.reserved.update_all('reservation = null')
end
+ # Cancels all outstanding WorkUnit reservations for all processes. (Useful
+ # in the console for debugging.)
+ def self.cancel_all_reservations
+ WorkUnit.update_all('reservation = null')
+ end
+
# Look up a WorkUnit by the worker that's currently processing it. Specified
# by <tt>pid@host</tt>.
def self.find_by_worker_name(name)
pid, host = name.split('@')
node = NodeRecord.find_by_host(host)
@@ -72,20 +78,20 @@
if splitting?
[JSON.parse(parsed_output(result))].flatten.each do |new_input|
WorkUnit.start(job, action, new_input, PROCESSING)
end
self.destroy
- job.set_next_status if job.done_splitting?
+ job.set_next_status if job && job.done_splitting?
else
update_attributes({
:status => SUCCEEDED,
:node_record => nil,
:worker_pid => nil,
:attempts => attempts + 1,
:output => result,
:time => time_taken
})
- job.check_for_completion
+ job && job.check_for_completion
end
end
# Mark this unit as having failed. May attempt a retry.
def fail(output, time_taken)