lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.7 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.8
- old
+ new
@@ -9,10 +9,14 @@
# We use a random number in (0...MAX_RESERVATION) to reserve work units.
# The size of the maximum signed integer in MySQL -- SQLite has no limit.
MAX_RESERVATION = 2147483647
+ # We only reserve a certain number of WorkUnits in a single go, to avoid
+ # reserving the entire table.
+ RESERVATION_LIMIT = 25
+
belongs_to :job
belongs_to :node_record
validates_presence_of :job_id, :status, :input, :action
@@ -26,35 +30,39 @@
# Attempt to send a list of WorkUnits to nodes with available capacity.
# A single central server process stops the same WorkUnit from being
# distributed to multiple nodes by reserving it first. The algorithm used
# should be lock-free.
#
- # We loop over the WorkUnits reserved by this process and try to match them
- # to Nodes that are capable of handling the Action. WorkUnits get removed
- # from the availability list when they are successfully sent, and Nodes get
- # removed when they are busy or have the action in question disabled.
+ # We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size,
+ # and try to match them to Nodes that are capable of handling the Action.
+ # WorkUnits get removed from the availability list when they are
+ # successfully sent, and Nodes get removed when they are busy or have the
+ # action in question disabled.
def self.distribute_to_nodes
- return unless reservation_number = WorkUnit.reserve_available
- work_units = WorkUnit.reserved(reservation_number)
- available_nodes = NodeRecord.available
- while node = available_nodes.shift and unit = work_units.shift do
- if node.actions.include? unit.action
- if node.send_work_unit(unit)
- available_nodes.push(node) unless node.busy?
- next
+ begin
+ return unless reservation_number = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT)
+ work_units = WorkUnit.reserved(reservation_number)
+ available_nodes = NodeRecord.available
+ while node = available_nodes.shift and unit = work_units.shift do
+ if node.actions.include? unit.action
+ if node.send_work_unit(unit)
+ available_nodes.push(node) unless node.busy?
+ next
+ end
end
+ work_units.push(unit)
end
- work_units.push(unit)
+ retry if work_units.empty? && !available_nodes.empty?
+ ensure
+ WorkUnit.cancel_reservations(reservation_number) if reservation_number
end
- ensure
- WorkUnit.cancel_reservations(reservation_number) if reservation_number
end
# Reserves all available WorkUnits for this process. Returns false if there
# were none available.
- def self.reserve_available
+ def self.reserve_available(options={})
reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
- any = WorkUnit.available.update_all("reservation = #{reservation_number}") > 0
+ any = WorkUnit.available.update_all("reservation = #{reservation_number}", nil, options) > 0
any && reservation_number
end
# Cancels all outstanding WorkUnit reservations for this process.
def self.cancel_reservations(reservation_number)