lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.3 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.4
- old
+ new
@@ -5,19 +5,25 @@
# daemon processing one at a time. The splitting and merging stages of a job
# are each run as a single WorkUnit.
class WorkUnit < ActiveRecord::Base
include ModelStatus
+ # We use a random number in (0...MAX_RESERVATION) to reserve work units.
+ # The size of the maximum signed integer in MySQL -- SQLite has no limit.
+ MAX_RESERVATION = 2147483647
+
belongs_to :job
belongs_to :node_record
validates_presence_of :job_id, :status, :input, :action
# Available WorkUnits are waiting to be distributed to Nodes for processing.
named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
# Reserved WorkUnits have been marked for distribution by a central server process.
- named_scope :reserved, {:conditions => {:reservation => $$}, :order => 'updated_at asc'}
+ named_scope :reserved, lambda {|reservation_number|
+ {:conditions => {:reservation => reservation_number}, :order => 'updated_at asc'}
+ }
# Attempt to send a list of WorkUnits to nodes with available capacity.
# A single central server process stops the same WorkUnit from being
# distributed to multiple nodes by reserving it first. The algorithm used
# should be lock-free.
@@ -25,12 +31,12 @@
# We loop over the WorkUnits reserved by this process and try to match them
# to Nodes that are capable of handling the Action. WorkUnits get removed
# from the availability list when they are successfully sent, and Nodes get
# removed when they are busy or have the action in question disabled.
def self.distribute_to_nodes
- return unless WorkUnit.reserve_available
- work_units = WorkUnit.reserved
+ return unless reservation_number = WorkUnit.reserve_available
+ work_units = WorkUnit.reserved(reservation_number)
available_nodes = NodeRecord.available
while node = available_nodes.shift and unit = work_units.shift do
if node.actions.include? unit.action
if node.send_work_unit(unit)
available_nodes.push(node) unless node.busy?
@@ -38,21 +44,23 @@
end
end
work_units.push(unit)
end
ensure
- WorkUnit.cancel_reservations
+ WorkUnit.cancel_reservations(reservation_number)
end
# Reserves all available WorkUnits for this process. Returns false if there
# were none available.
def self.reserve_available
- WorkUnit.available.update_all("reservation = #{$$}") > 0
+ reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
+ any = WorkUnit.available.update_all("reservation = #{reservation_number}") > 0
+ any && reservation_number
end
# Cancels all outstanding WorkUnit reservations for this process.
- def self.cancel_reservations
- WorkUnit.reserved.update_all('reservation = null')
+ def self.cancel_reservations(reservation_number)
+ WorkUnit.reserved(reservation_number).update_all('reservation = null')
end
# Cancels all outstanding WorkUnit reservations for all processes. (Useful
# in the console for debugging.)
def self.cancel_all_reservations