lib/mini_scheduler/manager.rb in mini_scheduler-0.12.2 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.12.3
- old
+ new
@@ -10,15 +10,16 @@
@mutex = Mutex.new
@queue = Queue.new
@manager = manager
@hostname = manager.hostname
- @reschedule_orphans_thread = Thread.new do
+ @recovery_thread = Thread.new do
while !@stopped
sleep 60
@mutex.synchronize do
+ repair_queue
reschedule_orphans
end
end
end
@keep_alive_thread = Thread.new do
@@ -43,10 +44,16 @@
@manager.keep_alive
rescue => ex
MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive")
end
+ def repair_queue
+ @manager.repair_queue
+ rescue => ex
+ MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair")
+ end
+
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
MiniScheduler.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
end
@@ -120,14 +127,14 @@
@mutex.synchronize do
@stopped = true
@keep_alive_thread.kill
- @reschedule_orphans_thread.kill
+ @recovery_thread.kill
@keep_alive_thread.join
- @reschedule_orphans_thread.join
+ @recovery_thread.join
enq(nil)
kill_thread = Thread.new do
sleep 0.5
@@ -248,9 +255,18 @@
def get_klass(name)
name.constantize
rescue NameError
nil
+ end
+
+ def repair_queue
+ return if redis.exists?(self.class.queue_key(queue)) ||
+ redis.exists?(self.class.queue_key(queue, hostname))
+
+ self.class.discover_schedules
+ .select { |schedule| schedule.queue == queue }
+ .each { |schedule| ensure_schedule!(schedule) }
end
def tick
lock do
schedule_next_job