lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.6.2 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.7.0.pre
- old
+ new
@@ -4,11 +4,11 @@
# through a single action. The WorkUnits are run in parallel, with each worker
# daemon processing one at a time. The splitting and merging stages of a job
# are each run as a single WorkUnit.
class WorkUnit < ActiveRecord::Base
include ModelStatus
-
+
# We use a random number in (0...MAX_RESERVATION) to reserve work units.
# The size of the maximum signed integer in MySQL -- SQLite has no limit.
MAX_RESERVATION = 2147483647
# We only reserve a certain number of WorkUnits in a single go, to avoid
@@ -19,15 +19,13 @@
belongs_to :node_record
validates_presence_of :job_id, :status, :input, :action
# Available WorkUnits are waiting to be distributed to Nodes for processing.
- named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
+ scope :available, -> { where(:reservation => nil, :worker_pid => nil, :status => INCOMPLETE) }
# Reserved WorkUnits have been marked for distribution by a central server process.
- named_scope :reserved, lambda {|reservation|
- {:conditions => {:reservation => reservation}, :order => 'updated_at asc'}
- }
+ scope :reserved, ->(reservation) { where(:reservation => reservation).order('updated_at asc') }
# Attempt to send a list of WorkUnits to nodes with available capacity.
# A single central server process stops the same WorkUnit from being
# distributed to multiple nodes by reserving it first. The algorithm used
# should be lock-free.
@@ -41,31 +39,29 @@
reservation = nil
loop do
# Find the available nodes, and determine what actions we're capable
# of running at the moment.
- available_nodes = NodeRecord.available
+ available_nodes = NodeRecord.available.to_a
available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
# Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
- work_units = WorkUnit.reserved(reservation)
+ work_units = WorkUnit.reserved(reservation).to_a
# Round robin through the nodes and units, sending the unit if the node
# is able to process it.
- work_units.each do |unit|
- available_nodes.each do |node|
- if node.actions.include? unit.action
- if node.send_work_unit unit
- work_units.delete unit
- available_nodes.delete node if node.busy?
- break
- end
+ while (unit = work_units.shift) and available_nodes.any? do
+ while node = available_nodes.shift do
+ if node.actions.include?(unit.action) and node.send_work_unit(unit)
+ available_nodes.push(node) unless node.busy?
+ break
end
end
+ work_units.push(unit) unless unit.assigned?
end
# If we still have units at this point, or we're fresh out of nodes,
# that means we're done.
return if work_units.any? || available_nodes.empty?
@@ -75,13 +71,15 @@
end
# Reserves all available WorkUnits for this process. Returns false if there
# were none available.
def self.reserve_available(options={})
- reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
+ reservation = SecureRandom.random_number(MAX_RESERVATION)
conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}"
- any = WorkUnit.update_all("reservation = #{reservation}", conditions, options) > 0
+ query = WorkUnit.where(conditions)
+ query.limit(options[:limit]) if options[:limit]
+ any = query.update_all("reservation = #{reservation}") > 0
any && reservation
end
# Cancels all outstanding WorkUnit reservations for this process.
def self.cancel_reservations(reservation)
@@ -164,29 +162,31 @@
# When a Node checks out a WorkUnit, establish the connection between
# WorkUnit and NodeRecord and record the worker_pid.
def assign_to(node_record, worker_pid)
update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end
+
+ def assigned?
+ !!(node_record_id && worker_pid)
+ end
# All output needs to be wrapped in a JSON object for consistency
# (unfortunately, JSON.parse needs the top-level to be an object or array).
# Convenience method to provide the parsed version.
def parsed_output(out = self.output)
JSON.parse(out)['output']
end
# The JSON representation of a WorkUnit shares the Job's options with all
# its cousin WorkUnits.
- def to_json
- {
- 'id' => self.id,
- 'job_id' => self.job_id,
- 'input' => self.input,
- 'attempts' => self.attempts,
- 'action' => self.action,
- 'options' => JSON.parse(self.job.options),
- 'status' => self.status
- }.to_json
+ class Serializer < ActiveModel::Serializer
+ attributes :id, :job_id, :input, :attempts, :action, :options, :status
+
+ def options; JSON.parse(object.job.options); end
end
+
+ def active_model_serializer; Serializer; end
+ def to_json; Serializer.new(self).to_json; end
end
end
+require 'securerandom'