lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.6.2 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.7.0.pre

- old
+ new

@@ -4,11 +4,11 @@ # through a single action. The WorkUnits are run in parallel, with each worker # daemon processing one at a time. The splitting and merging stages of a job # are each run as a single WorkUnit. class WorkUnit < ActiveRecord::Base include ModelStatus - + # We use a random number in (0...MAX_RESERVATION) to reserve work units. # The size of the maximum signed integer in MySQL -- SQLite has no limit. MAX_RESERVATION = 2147483647 # We only reserve a certain number of WorkUnits in a single go, to avoid @@ -19,15 +19,13 @@ belongs_to :node_record validates_presence_of :job_id, :status, :input, :action # Available WorkUnits are waiting to be distributed to Nodes for processing. - named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} + scope :available, -> { where(:reservation => nil, :worker_pid => nil, :status => INCOMPLETE) } # Reserved WorkUnits have been marked for distribution by a central server process. - named_scope :reserved, lambda {|reservation| - {:conditions => {:reservation => reservation}, :order => 'updated_at asc'} - } + scope :reserved, ->(reservation) { where(:reservation => reservation).order('updated_at asc') } # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used # should be lock-free. @@ -41,31 +39,29 @@ reservation = nil loop do # Find the available nodes, and determine what actions we're capable # of running at the moment. - available_nodes = NodeRecord.available + available_nodes = NodeRecord.available.to_a available_actions = available_nodes.map {|node| node.actions }.flatten.uniq filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})" # Reserve a handful of available work units. WorkUnit.cancel_reservations(reservation) if reservation return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter) - work_units = WorkUnit.reserved(reservation) + work_units = WorkUnit.reserved(reservation).to_a # Round robin through the nodes and units, sending the unit if the node # is able to process it. - work_units.each do |unit| - available_nodes.each do |node| - if node.actions.include? unit.action - if node.send_work_unit unit - work_units.delete unit - available_nodes.delete node if node.busy? - break - end + while (unit = work_units.shift) and available_nodes.any? do + while node = available_nodes.shift do + if node.actions.include?(unit.action) and node.send_work_unit(unit) + available_nodes.push(node) unless node.busy? + break end end + work_units.push(unit) unless unit.assigned? end # If we still have units at this point, or we're fresh out of nodes, # that means we're done. return if work_units.any? || available_nodes.empty? @@ -75,13 +71,15 @@ end # Reserves all available WorkUnits for this process. Returns false if there # were none available. def self.reserve_available(options={}) - reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) + reservation = SecureRandom.random_number(MAX_RESERVATION) conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}" - any = WorkUnit.update_all("reservation = #{reservation}", conditions, options) > 0 + query = WorkUnit.where(conditions) + query.limit(options[:limit]) if options[:limit] + any = query.update_all("reservation = #{reservation}") > 0 any && reservation end # Cancels all outstanding WorkUnit reservations for this process. def self.cancel_reservations(reservation) @@ -164,29 +162,31 @@ # When a Node checks out a WorkUnit, establish the connection between # WorkUnit and NodeRecord and record the worker_pid. def assign_to(node_record, worker_pid) update_attributes!(:node_record => node_record, :worker_pid => worker_pid) end + + def assigned? + !!(node_record_id && worker_pid) + end # All output needs to be wrapped in a JSON object for consistency # (unfortunately, JSON.parse needs the top-level to be an object or array). # Convenience method to provide the parsed version. def parsed_output(out = self.output) JSON.parse(out)['output'] end # The JSON representation of a WorkUnit shares the Job's options with all # its cousin WorkUnits. - def to_json - { - 'id' => self.id, - 'job_id' => self.job_id, - 'input' => self.input, - 'attempts' => self.attempts, - 'action' => self.action, - 'options' => JSON.parse(self.job.options), - 'status' => self.status - }.to_json + class Serializer < ActiveModel::Serializer + attributes :id, :job_id, :input, :attempts, :action, :options, :status + + def options; JSON.parse(object.job.options); end end + + def active_model_serializer; Serializer; end + def to_json; Serializer.new(self).to_json; end end end +require 'securerandom'