lib/mini_scheduler/manager.rb in mini_scheduler-0.9.2 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.10.0
- old
+ new
@@ -1,8 +1,8 @@
module MiniScheduler
class Manager
- attr_accessor :random_ratio, :redis, :enable_stats
+ attr_accessor :random_ratio, :redis, :enable_stats, :queue
class Runner
def initialize(manager)
@stopped = false
@mutex = Mutex.new
@@ -166,15 +166,17 @@
def self.without_runner
self.new(skip_runner: true)
end
def initialize(options = nil)
+ @queue = options && options[:queue] || "default"
+
@redis = MiniScheduler.redis
@random_ratio = 0.1
unless options && options[:skip_runner]
@runner = Runner.new(self)
- self.class.current = self
+ self.class.current[@queue] = self
end
@hostname = options && options[:hostname]
@manager_id = SecureRandom.hex
@@ -184,17 +186,13 @@
@enable_stats = !!defined?(MiniScheduler::Stat)
end
end
def self.current
- @current
+ @current ||= {}
end
- def self.current=(manager)
- @current = manager
- end
-
def hostname
@hostname ||= `hostname`.strip
end
def schedule_info(klass)
@@ -223,11 +221,11 @@
reschedule_orphans_on!(hostname)
end
end
def reschedule_orphans_on!(hostname = nil)
- redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
+ redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
klass = get_klass(key)
next unless klass
info = schedule_info(klass)
if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
@@ -251,18 +249,18 @@
schedule_next_job(hostname)
end
end
def schedule_next_job(hostname = nil)
- (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
+ (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = get_klass(key)
unless klass
# corrupt key, nuke it (renamed job or something)
- redis.zrem Manager.queue_key(hostname), key
+ redis.zrem Manager.queue_key(queue, hostname), key
return
end
info = schedule_info(klass)
info.prev_run = Time.now.to_i
info.prev_result = "QUEUED"
@@ -279,11 +277,11 @@
@runner.wait_till_done
end
def stop!
@runner.stop!
- self.class.current = nil
+ self.class.current.delete(@queue)
end
def keep_alive_duration
60
end
@@ -291,15 +289,19 @@
def keep_alive
redis.setex identity_key, keep_alive_duration, ""
end
def lock
- MiniScheduler::DistributedMutex.synchronize(Manager.lock_key, MiniScheduler.redis) do
+ MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
yield
end
end
+ def self.discover_queues
+ ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
+ end
+
def self.discover_schedules
# hack for developemnt reloader is crazytown
# multiple classes with same name can be in
# object space
unique = Set.new
@@ -324,18 +326,18 @@
def identity_key
@identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
- def self.lock_key
- "_scheduler_lock_"
+ def self.lock_key(queue)
+ "_scheduler_lock_#{queue}_"
end
- def self.queue_key(hostname = nil)
+ def self.queue_key(queue, hostname = nil)
if hostname
- "_scheduler_queue_#{hostname}_"
+ "_scheduler_queue_#{queue}_#{hostname}_"
else
- "_scheduler_queue_"
+ "_scheduler_queue_#{queue}_"
end
end
def self.schedule_key(klass, hostname = nil)
if hostname