lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.7 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.8

- old
+ new

@@ -9,10 +9,14 @@ # We use a random number in (0...MAX_RESERVATION) to reserve work units. # The size of the maximum signed integer in MySQL -- SQLite has no limit. MAX_RESERVATION = 2147483647 + # We only reserve a certain number of WorkUnits in a single go, to avoid + # reserving the entire table. + RESERVATION_LIMIT = 25 + belongs_to :job belongs_to :node_record validates_presence_of :job_id, :status, :input, :action @@ -26,35 +30,39 @@ # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used # should be lock-free. # - # We loop over the WorkUnits reserved by this process and try to match them - # to Nodes that are capable of handling the Action. WorkUnits get removed - # from the availability list when they are successfully sent, and Nodes get - # removed when they are busy or have the action in question disabled. + # We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, + # and try to match them to Nodes that are capable of handling the Action. + # WorkUnits get removed from the availability list when they are + # successfully sent, and Nodes get removed when they are busy or have the + # action in question disabled. def self.distribute_to_nodes - return unless reservation_number = WorkUnit.reserve_available - work_units = WorkUnit.reserved(reservation_number) - available_nodes = NodeRecord.available - while node = available_nodes.shift and unit = work_units.shift do - if node.actions.include? unit.action - if node.send_work_unit(unit) - available_nodes.push(node) unless node.busy? - next + begin + return unless reservation_number = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT) + work_units = WorkUnit.reserved(reservation_number) + available_nodes = NodeRecord.available + while node = available_nodes.shift and unit = work_units.shift do + if node.actions.include? unit.action + if node.send_work_unit(unit) + available_nodes.push(node) unless node.busy? + next + end end + work_units.push(unit) end - work_units.push(unit) + retry if work_units.empty? && !available_nodes.empty? + ensure + WorkUnit.cancel_reservations(reservation_number) if reservation_number end - ensure - WorkUnit.cancel_reservations(reservation_number) if reservation_number end # Reserves all available WorkUnits for this process. Returns false if there # were none available. - def self.reserve_available + def self.reserve_available(options={}) reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) - any = WorkUnit.available.update_all("reservation = #{reservation_number}") > 0 + any = WorkUnit.available.update_all("reservation = #{reservation_number}", nil, options) > 0 any && reservation_number end # Cancels all outstanding WorkUnit reservations for this process. def self.cancel_reservations(reservation_number)