lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.3 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.4

- old
+ new

@@ -5,19 +5,25 @@ # daemon processing one at a time. The splitting and merging stages of a job # are each run as a single WorkUnit. class WorkUnit < ActiveRecord::Base include ModelStatus + # We use a random number in (0...MAX_RESERVATION) to reserve work units. + # The size of the maximum signed integer in MySQL -- SQLite has no limit. + MAX_RESERVATION = 2147483647 + belongs_to :job belongs_to :node_record validates_presence_of :job_id, :status, :input, :action # Available WorkUnits are waiting to be distributed to Nodes for processing. named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} # Reserved WorkUnits have been marked for distribution by a central server process. - named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'} + named_scope :reserved, lambda {|reservation_number| + {:conditions => {:reservation => reservation_number}, :order => 'updated_at asc'} + } # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used # should be lock-free. @@ -25,12 +31,12 @@ # We loop over the WorkUnits reserved by this process and try to match them # to Nodes that are capable of handling the Action. WorkUnits get removed # from the availability list when they are successfully sent, and Nodes get # removed when they are busy or have the action in question disabled. def self.distribute_to_nodes - return unless WorkUnit.reserve_available - work_units = WorkUnit.reserved + return unless reservation_number = WorkUnit.reserve_available + work_units = WorkUnit.reserved(reservation_number) available_nodes = NodeRecord.available while node = available_nodes.shift and unit = work_units.shift do if node.actions.include? unit.action if node.send_work_unit(unit) available_nodes.push(node) unless node.busy? @@ -38,21 +44,23 @@ end end work_units.push(unit) end ensure - WorkUnit.cancel_reservations + WorkUnit.cancel_reservations(reservation_number) end # Reserves all available WorkUnits for this process. Returns false if there # were none available. def self.reserve_available - WorkUnit.available.update_all("reservation = #{$$}") > 0 + reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) + any = WorkUnit.available.update_all("reservation = #{reservation_number}") > 0 + any && reservation_number end # Cancels all outstanding WorkUnit reservations for this process. - def self.cancel_reservations - WorkUnit.reserved.update_all('reservation = null') + def self.cancel_reservations(reservation_number) + WorkUnit.reserved(reservation_number).update_all('reservation = null') end # Cancels all outstanding WorkUnit reservations for all processes. (Useful # in the console for debugging.) def self.cancel_all_reservations