lib/mini_scheduler/manager.rb in mini_scheduler-0.9.2 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.10.0

- old
+ new

@@ -1,8 +1,8 @@ module MiniScheduler class Manager - attr_accessor :random_ratio, :redis, :enable_stats + attr_accessor :random_ratio, :redis, :enable_stats, :queue class Runner def initialize(manager) @stopped = false @mutex = Mutex.new @@ -166,15 +166,17 @@ def self.without_runner self.new(skip_runner: true) end def initialize(options = nil) + @queue = options && options[:queue] || "default" + @redis = MiniScheduler.redis @random_ratio = 0.1 unless options && options[:skip_runner] @runner = Runner.new(self) - self.class.current = self + self.class.current[@queue] = self end @hostname = options && options[:hostname] @manager_id = SecureRandom.hex @@ -184,17 +186,13 @@ @enable_stats = !!defined?(MiniScheduler::Stat) end end def self.current - @current + @current ||= {} end - def self.current=(manager) - @current = manager - end - def hostname @hostname ||= `hostname`.strip end def schedule_info(klass) @@ -223,11 +221,11 @@ reschedule_orphans_on!(hostname) end end def reschedule_orphans_on!(hostname = nil) - redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| + redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && @@ -251,18 +249,18 @@ schedule_next_job(hostname) end end def schedule_next_job(hostname = nil) - (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true + (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) unless klass # corrupt key, nuke it (renamed job or something) - redis.zrem Manager.queue_key(hostname), key + redis.zrem Manager.queue_key(queue, hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" @@ -279,11 +277,11 @@ @runner.wait_till_done end def stop! @runner.stop! - self.class.current = nil + self.class.current.delete(@queue) end def keep_alive_duration 60 end @@ -291,15 +289,19 @@ def keep_alive redis.setex identity_key, keep_alive_duration, "" end def lock - MiniScheduler::DistributedMutex.synchronize(Manager.lock_key, MiniScheduler.redis) do + MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end + def self.discover_queues + ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set + end + def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new @@ -324,18 +326,18 @@ def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end - def self.lock_key - "_scheduler_lock_" + def self.lock_key(queue) + "_scheduler_lock_#{queue}_" end - def self.queue_key(hostname = nil) + def self.queue_key(queue, hostname = nil) if hostname - "_scheduler_queue_#{hostname}_" + "_scheduler_queue_#{queue}_#{hostname}_" else - "_scheduler_queue_" + "_scheduler_queue_#{queue}_" end end def self.schedule_key(klass, hostname = nil) if hostname