lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.8 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.9
- old
+ new
@@ -21,12 +21,12 @@
validates_presence_of :job_id, :status, :input, :action
# Available WorkUnits are waiting to be distributed to Nodes for processing.
named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
# Reserved WorkUnits have been marked for distribution by a central server process.
- named_scope :reserved, lambda {|reservation_number|
- {:conditions => {:reservation => reservation_number}, :order => 'updated_at asc'}
+ named_scope :reserved, lambda {|reservation|
+ {:conditions => {:reservation => reservation}, :order => 'updated_at asc'}
}
# Attempt to send a list of WorkUnits to nodes with available capacity.
# A single central server process stops the same WorkUnit from being
# distributed to multiple nodes by reserving it first. The algorithm used
@@ -36,40 +36,41 @@
# and try to match them to Nodes that are capable of handling the Action.
# WorkUnits get removed from the availability list when they are
# successfully sent, and Nodes get removed when they are busy or have the
# action in question disabled.
def self.distribute_to_nodes
- begin
- return unless reservation_number = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT)
- work_units = WorkUnit.reserved(reservation_number)
+ reservation = nil
+ loop do
+ return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT)
+ work_units = WorkUnit.reserved(reservation)
available_nodes = NodeRecord.available
while node = available_nodes.shift and unit = work_units.shift do
if node.actions.include? unit.action
if node.send_work_unit(unit)
available_nodes.push(node) unless node.busy?
next
end
end
work_units.push(unit)
end
- retry if work_units.empty? && !available_nodes.empty?
- ensure
- WorkUnit.cancel_reservations(reservation_number) if reservation_number
+ return if work_units.any? || available_nodes.empty?
end
+ ensure
+ WorkUnit.cancel_reservations(reservation) if reservation
end
# Reserves all available WorkUnits for this process. Returns false if there
# were none available.
def self.reserve_available(options={})
- reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
- any = WorkUnit.available.update_all("reservation = #{reservation_number}", nil, options) > 0
- any && reservation_number
+ reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
+ any = WorkUnit.available.update_all("reservation = #{reservation}", nil, options) > 0
+ any && reservation
end
# Cancels all outstanding WorkUnit reservations for this process.
- def self.cancel_reservations(reservation_number)
- WorkUnit.reserved(reservation_number).update_all('reservation = null')
+ def self.cancel_reservations(reservation)
+ WorkUnit.reserved(reservation).update_all('reservation = null')
end
# Cancels all outstanding WorkUnit reservations for all processes. (Useful
# in the console for debugging.)
def self.cancel_all_reservations
@@ -93,10 +94,11 @@
# Splitting work units are handled differently (an optimization) -- they
# immediately fire off all of their resulting WorkUnits for processing,
# without waiting for the rest of their splitting cousins to complete.
def finish(result, time_taken)
if splitting?
- [JSON.parse(parsed_output(result))].flatten.each do |new_input|
+ [parsed_output(result)].flatten.each do |new_input|
+ new_input = new_input.to_json unless new_input.is_a? String
WorkUnit.start(job, action, new_input, PROCESSING)
end
self.destroy
job.set_next_status if job && job.done_splitting?
else