lib/right_chimp/queue/ChimpQueue.rb in right_chimp-1.1.3 vs lib/right_chimp/queue/ChimpQueue.rb in right_chimp-2.0
- old
+ new
@@ -3,98 +3,98 @@
# The ChimpQueue is a singleton that contains the
# chimp work queue
#
class ChimpQueue
include Singleton
-
+
attr_accessor :delay, :retry_count, :max_threads, :group
-
+
def initialize
@delay = 0
@retry_count = 0
@max_threads = 10
@workers_never_exit = true
@threads = []
@semaphore = Mutex.new
self.reset!
end
-
+
#
# Reset the queue and the :default group
#
# This doesn't do anything to the groups's jobs
#
def reset!
@group = {}
@group[:default] = ParallelExecutionGroup.new(:default)
end
-
+
#
# Start up queue runners
#
def start
self.sort_queues!
-
+
for i in (1..max_threads)
@threads << Thread.new(i) do
worker = QueueWorker.new
worker.delay = @delay
worker.retry_count = @retry_count
worker.run
end
end
end
-
+
#
# Push a task into the queue
#
def push(g, w)
raise "no group specified" unless g
create_group(g) if not ChimpQueue[g]
ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id)
end
-
+
def create_group(name, type = :parallel, concurrency = 1)
Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}"
new_group = ExecutionGroupFactory.from_type(type)
new_group.group_id = name
new_group.concurrency = concurrency
ChimpQueue[name] = new_group
end
-
+
#
# Grab the oldest work item available
#
def shift
r = nil
@semaphore.synchronize do
@group.values.each do |group|
if group.ready?
r = group.shift
- Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'"
+ Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil?
break
end
- end
+ end
end
return(r)
end
-
+
#
# Wait until a group is done
#
def wait_until_done(g, &block)
- while @group[g].running?
+ while @group[g].running?
@threads.each do |t|
t.join(1)
- yield
+ yield
end
end
end
-
+
#
# Quit - empty the queue and wait for remaining jobs to complete
- #
+ #
def quit
i = 0
@group.keys.each do |group|
wait_until_done(group) do
if i < 30
@@ -104,86 +104,86 @@
else
break
end
end
end
-
+
@threads.each { |t| t.kill }
puts " done."
end
-
+
#
# Run all threads forever (used by chimpd)
#
def run_threads
- @threads.each do |t|
+ @threads.each do |t|
t.join(5)
end
end
-
+
#
# return the total number of queued (non-executing) objects
#
def size
s = 0
@group.values.each do |group|
s += group.size
end
return(s)
end
-
+
#
# Allow the groups to be accessed as ChimpQueue.group[:foo]
#
def self.[](group)
return ChimpQueue.instance.group[group]
end
-
+
def self.[]=(k,v)
ChimpQueue.instance.group[k] = v
end
-
+
#
# Return an array of all jobs with the requested
# status.
#
def get_jobs_by_status(status)
r = []
- @group.values.each do |group|
+ @group.values.each do |group|
v = group.get_jobs_by_status(status)
if v != nil and v != []
r += v
end
end
-
+
return r
end
-
+
def get_job(id)
jobs = self.get_jobs
-
+
jobs.each do |j|
return j if j.job_id == id
end
end
-
+
def get_jobs
r = []
@group.values.each do |group|
group.get_jobs.each { |job| r << job }
end
-
+
return r
end
-
- #############################################################
+
+ #############################################################
protected
-
+
#
# Sort all the things, er, queues
#
def sort_queues!
return @group.values.each { |group| group.sort! }
end
-
+
end
end