lib/mini_scheduler/manager.rb in mini_scheduler-0.14.0 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.15.0
- old
+ new
@@ -17,10 +17,11 @@
sleep 60
@mutex.synchronize do
repair_queue
reschedule_orphans
+ ensure_worker_threads
end
end
end
@keep_alive_thread = Thread.new do
while !@stopped
@@ -28,44 +29,71 @@
keep_alive
end
sleep (@manager.keep_alive_duration / 2)
end
end
- @threads = []
- manager.workers.times do
- @threads << Thread.new do
- while !@stopped
- process_queue
- end
- end
- end
+ ensure_worker_threads
end
- def keep_alive
- @manager.keep_alive
+ def keep_alive(*ids)
+ @manager.keep_alive(*ids)
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive")
+ MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler keep)alive")
end
def repair_queue
@manager.repair_queue
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair")
+ MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler repair_queue")
end
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
+ MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler reschedule_orphans")
end
+ def ensure_worker_threads
+ @threads ||= []
+ @threads.delete_if { |t| !t.alive? }
+ (@manager.workers - @threads.size).times do
+ @threads << Thread.new { worker_loop }
+ end
+ rescue => ex
+ MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler ensure_worker_threads")
+ end
+
+ def worker_loop
+ set_current_worker_thread_id!
+ keep_alive(current_worker_thread_id)
+ while !@stopped
+ begin
+ process_queue
+ rescue => ex
+ MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler worker_loop")
+ break # Data could be in a bad state - stop the thread
+ end
+ end
+ end
+
def hostname
@hostname
end
- def process_queue
+ def current_worker_thread_id
+ Thread.current[:mini_scheduler_worker_thread_id]
+ end
+ def set_current_worker_thread_id!
+ Thread.current[:mini_scheduler_worker_thread_id] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}"
+ end
+
+ def worker_thread_ids
+ @threads.filter(&:alive?).filter_map { |t| t[:mini_scheduler_worker_thread_id] }
+ end
+
+ def process_queue
klass = @queue.deq
# hack alert, I need to both deq and set @running atomically.
@running = true
return if !klass
@@ -76,10 +104,11 @@
stat = nil
error = nil
begin
info.prev_result = "RUNNING"
+ info.current_owner = current_worker_thread_id
@mutex.synchronize { info.write! }
if @manager.enable_stats
stat = MiniScheduler::Stat.create!(
name: klass.to_s,
@@ -90,11 +119,11 @@
)
end
klass.new.perform
rescue => e
- MiniScheduler.handle_job_exception(e, message: "Running a scheduled job", job: { "class" => klass })
+ MiniScheduler.handle_job_exception(e, message: "Error while running a scheduled job", job: { "class" => klass })
error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
@@ -111,12 +140,10 @@
MiniScheduler.job_ran&.call(stat)
end
attempts(3) do
@mutex.synchronize { info.write! }
end
- rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Processing scheduled job queue")
ensure
@running = false
if defined?(ActiveRecord::Base)
ActiveRecord::Base.connection_handler.clear_active_connections!
end
@@ -161,18 +188,20 @@
while @running
sleep 0.001
end
end
- def attempts(n)
- n.times {
- begin
- yield; break
- rescue
- sleep Random.rand
- end
- }
+ def attempts(max_attempts)
+ attempt = 0
+ begin
+ yield
+ rescue
+ attempt += 1
+ raise if attempt >= max_attempts
+ sleep Random.rand
+ retry
+ end
end
end
def self.without_runner
@@ -312,12 +341,15 @@
def keep_alive_duration
60
end
- def keep_alive
- redis.setex identity_key, keep_alive_duration, ""
+ def keep_alive(*ids)
+ ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0
+ ids.each do |identity_key|
+ redis.setex identity_key, keep_alive_duration, ""
+ end
end
def lock
MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
yield
@@ -342,19 +374,23 @@
end
end
schedules
end
- @mutex = Mutex.new
+ @class_mutex = Mutex.new
def self.seq
- @mutex.synchronize do
+ @class_mutex.synchronize do
@i ||= 0
@i += 1
end
end
+ @@identity_key_mutex = Mutex.new
def identity_key
- @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
+ return @identity_key if @identity_key
+ @@identity_key_mutex.synchronize do
+ @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
+ end
end
def self.lock_key(queue)
"_scheduler_lock_#{queue}_"
end