lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.8 vs lib/cloud_crowd/models/work_unit.rb in cloud-crowd-0.2.9

- old
+ new

@@ -21,12 +21,12 @@ validates_presence_of :job_id, :status, :input, :action # Available WorkUnits are waiting to be distributed to Nodes for processing. named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} # Reserved WorkUnits have been marked for distribution by a central server process. - named_scope :reserved, lambda {|reservation_number| - {:conditions => {:reservation => reservation_number}, :order => 'updated_at asc'} + named_scope :reserved, lambda {|reservation| + {:conditions => {:reservation => reservation}, :order => 'updated_at asc'} } # Attempt to send a list of WorkUnits to nodes with available capacity. # A single central server process stops the same WorkUnit from being # distributed to multiple nodes by reserving it first. The algorithm used @@ -36,40 +36,41 @@ # and try to match them to Nodes that are capable of handling the Action. # WorkUnits get removed from the availability list when they are # successfully sent, and Nodes get removed when they are busy or have the # action in question disabled. def self.distribute_to_nodes - begin - return unless reservation_number = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT) - work_units = WorkUnit.reserved(reservation_number) + reservation = nil + loop do + return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT) + work_units = WorkUnit.reserved(reservation) available_nodes = NodeRecord.available while node = available_nodes.shift and unit = work_units.shift do if node.actions.include? unit.action if node.send_work_unit(unit) available_nodes.push(node) unless node.busy? next end end work_units.push(unit) end - retry if work_units.empty? && !available_nodes.empty? - ensure - WorkUnit.cancel_reservations(reservation_number) if reservation_number + return if work_units.any? || available_nodes.empty? end + ensure + WorkUnit.cancel_reservations(reservation) if reservation end # Reserves all available WorkUnits for this process. Returns false if there # were none available. def self.reserve_available(options={}) - reservation_number = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) - any = WorkUnit.available.update_all("reservation = #{reservation_number}", nil, options) > 0 - any && reservation_number + reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION) + any = WorkUnit.available.update_all("reservation = #{reservation}", nil, options) > 0 + any && reservation end # Cancels all outstanding WorkUnit reservations for this process. - def self.cancel_reservations(reservation_number) - WorkUnit.reserved(reservation_number).update_all('reservation = null') + def self.cancel_reservations(reservation) + WorkUnit.reserved(reservation).update_all('reservation = null') end # Cancels all outstanding WorkUnit reservations for all processes. (Useful # in the console for debugging.) def self.cancel_all_reservations @@ -93,10 +94,11 @@ # Splitting work units are handled differently (an optimization) -- they # immediately fire off all of their resulting WorkUnits for processing, # without waiting for the rest of their splitting cousins to complete. def finish(result, time_taken) if splitting? - [JSON.parse(parsed_output(result))].flatten.each do |new_input| + [parsed_output(result)].flatten.each do |new_input| + new_input = new_input.to_json unless new_input.is_a? String WorkUnit.start(job, action, new_input, PROCESSING) end self.destroy job.set_next_status if job && job.done_splitting? else