lib/right_chimp/queue/ChimpQueue.rb in right_chimp-1.1.3 vs lib/right_chimp/queue/ChimpQueue.rb in right_chimp-2.0

- old
+ new

@@ -3,98 +3,98 @@ # The ChimpQueue is a singleton that contains the # chimp work queue # class ChimpQueue include Singleton - + attr_accessor :delay, :retry_count, :max_threads, :group - + def initialize @delay = 0 @retry_count = 0 @max_threads = 10 @workers_never_exit = true @threads = [] @semaphore = Mutex.new self.reset! end - + # # Reset the queue and the :default group # # This doesn't do anything to the groups's jobs # def reset! @group = {} @group[:default] = ParallelExecutionGroup.new(:default) end - + # # Start up queue runners # def start self.sort_queues! - + for i in (1..max_threads) @threads << Thread.new(i) do worker = QueueWorker.new worker.delay = @delay worker.retry_count = @retry_count worker.run end end end - + # # Push a task into the queue # def push(g, w) raise "no group specified" unless g create_group(g) if not ChimpQueue[g] ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id) end - + def create_group(name, type = :parallel, concurrency = 1) Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}" new_group = ExecutionGroupFactory.from_type(type) new_group.group_id = name new_group.concurrency = concurrency ChimpQueue[name] = new_group end - + # # Grab the oldest work item available # def shift r = nil @semaphore.synchronize do @group.values.each do |group| if group.ready? r = group.shift - Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" + Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil? break end - end + end end return(r) end - + # # Wait until a group is done # def wait_until_done(g, &block) - while @group[g].running? + while @group[g].running? @threads.each do |t| t.join(1) - yield + yield end end end - + # # Quit - empty the queue and wait for remaining jobs to complete - # + # def quit i = 0 @group.keys.each do |group| wait_until_done(group) do if i < 30 @@ -104,86 +104,86 @@ else break end end end - + @threads.each { |t| t.kill } puts " done." end - + # # Run all threads forever (used by chimpd) # def run_threads - @threads.each do |t| + @threads.each do |t| t.join(5) end end - + # # return the total number of queued (non-executing) objects # def size s = 0 @group.values.each do |group| s += group.size end return(s) end - + # # Allow the groups to be accessed as ChimpQueue.group[:foo] # def self.[](group) return ChimpQueue.instance.group[group] end - + def self.[]=(k,v) ChimpQueue.instance.group[k] = v end - + # # Return an array of all jobs with the requested # status. # def get_jobs_by_status(status) r = [] - @group.values.each do |group| + @group.values.each do |group| v = group.get_jobs_by_status(status) if v != nil and v != [] r += v end end - + return r end - + def get_job(id) jobs = self.get_jobs - + jobs.each do |j| return j if j.job_id == id end end - + def get_jobs r = [] @group.values.each do |group| group.get_jobs.each { |job| r << job } end - + return r end - - ############################################################# + + ############################################################# protected - + # # Sort all the things, er, queues # def sort_queues! return @group.values.each { |group| group.sort! } end - + end end