lib/mini_scheduler/manager.rb in mini_scheduler-0.14.0 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.15.0

- old
+ new

@@ -17,10 +17,11 @@ sleep 60 @mutex.synchronize do repair_queue reschedule_orphans + ensure_worker_threads end end end @keep_alive_thread = Thread.new do while !@stopped @@ -28,44 +29,71 @@ keep_alive end sleep (@manager.keep_alive_duration / 2) end end - @threads = [] - manager.workers.times do - @threads << Thread.new do - while !@stopped - process_queue - end - end - end + ensure_worker_threads end - def keep_alive - @manager.keep_alive + def keep_alive(*ids) + @manager.keep_alive(*ids) rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive") + MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler keep)alive") end def repair_queue @manager.repair_queue rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair") + MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler repair_queue") end def reschedule_orphans @manager.reschedule_orphans! rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler") + MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler reschedule_orphans") end + def ensure_worker_threads + @threads ||= [] + @threads.delete_if { |t| !t.alive? } + (@manager.workers - @threads.size).times do + @threads << Thread.new { worker_loop } + end + rescue => ex + MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler ensure_worker_threads") + end + + def worker_loop + set_current_worker_thread_id! + keep_alive(current_worker_thread_id) + while !@stopped + begin + process_queue + rescue => ex + MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler worker_loop") + break # Data could be in a bad state - stop the thread + end + end + end + def hostname @hostname end - def process_queue + def current_worker_thread_id + Thread.current[:mini_scheduler_worker_thread_id] + end + def set_current_worker_thread_id! + Thread.current[:mini_scheduler_worker_thread_id] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}" + end + + def worker_thread_ids + @threads.filter(&:alive?).filter_map { |t| t[:mini_scheduler_worker_thread_id] } + end + + def process_queue klass = @queue.deq # hack alert, I need to both deq and set @running atomically. @running = true return if !klass @@ -76,10 +104,11 @@ stat = nil error = nil begin info.prev_result = "RUNNING" + info.current_owner = current_worker_thread_id @mutex.synchronize { info.write! } if @manager.enable_stats stat = MiniScheduler::Stat.create!( name: klass.to_s, @@ -90,11 +119,11 @@ ) end klass.new.perform rescue => e - MiniScheduler.handle_job_exception(e, message: "Running a scheduled job", job: { "class" => klass }) + MiniScheduler.handle_job_exception(e, message: "Error while running a scheduled job", job: { "class" => klass }) error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}" failed = true end duration = ((Time.now.to_f - start) * 1000).to_i @@ -111,12 +140,10 @@ MiniScheduler.job_ran&.call(stat) end attempts(3) do @mutex.synchronize { info.write! } end - rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Processing scheduled job queue") ensure @running = false if defined?(ActiveRecord::Base) ActiveRecord::Base.connection_handler.clear_active_connections! end @@ -161,18 +188,20 @@ while @running sleep 0.001 end end - def attempts(n) - n.times { - begin - yield; break - rescue - sleep Random.rand - end - } + def attempts(max_attempts) + attempt = 0 + begin + yield + rescue + attempt += 1 + raise if attempt >= max_attempts + sleep Random.rand + retry + end end end def self.without_runner @@ -312,12 +341,15 @@ def keep_alive_duration 60 end - def keep_alive - redis.setex identity_key, keep_alive_duration, "" + def keep_alive(*ids) + ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0 + ids.each do |identity_key| + redis.setex identity_key, keep_alive_duration, "" + end end def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield @@ -342,19 +374,23 @@ end end schedules end - @mutex = Mutex.new + @class_mutex = Mutex.new def self.seq - @mutex.synchronize do + @class_mutex.synchronize do @i ||= 0 @i += 1 end end + @@identity_key_mutex = Mutex.new def identity_key - @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" + return @identity_key if @identity_key + @@identity_key_mutex.synchronize do + @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" + end end def self.lock_key(queue) "_scheduler_lock_#{queue}_" end