lib/mini_scheduler/manager.rb in mini_scheduler-0.16.0 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.17.0

- old
+ new

@@ -10,29 +10,31 @@ @mutex = Mutex.new @queue = Queue.new @manager = manager @hostname = manager.hostname - @recovery_thread = Thread.new do - while !@stopped - sleep 60 + @recovery_thread = + Thread.new do + while !@stopped + sleep 60 - @mutex.synchronize do - repair_queue - reschedule_orphans - ensure_worker_threads + @mutex.synchronize do + repair_queue + reschedule_orphans + ensure_worker_threads + end end end - end - @keep_alive_thread = Thread.new do - while !@stopped - @mutex.synchronize do - keep_alive + + @keep_alive_thread = + Thread.new do + while !@stopped + @mutex.synchronize { keep_alive } + sleep(@manager.keep_alive_duration / 2) end - sleep (@manager.keep_alive_duration / 2) end - end + ensure_worker_threads end def keep_alive(*ids) @manager.keep_alive(*ids) @@ -47,31 +49,40 @@ end def reschedule_orphans @manager.reschedule_orphans! rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler reschedule_orphans") + MiniScheduler.handle_job_exception( + ex, + message: "Error during MiniScheduler reschedule_orphans", + ) end def ensure_worker_threads @threads ||= [] @threads.delete_if { |t| !t.alive? } - (@manager.workers - @threads.size).times do - @threads << Thread.new { worker_loop } - end + (@manager.workers - @threads.size).times { @threads << Thread.new { worker_loop } } rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler ensure_worker_threads") + MiniScheduler.handle_job_exception( + ex, + message: "Error during MiniScheduler ensure_worker_threads", + ) end def worker_loop set_current_worker_thread_id! keep_alive(current_worker_thread_id) + while !@stopped begin process_queue rescue => ex - MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler worker_loop") + MiniScheduler.handle_job_exception( + ex, + message: "Error during MiniScheduler worker_loop", + ) + break # Data could be in a bad state - stop the thread end end end @@ -82,19 +93,22 @@ def current_worker_thread_id Thread.current[:mini_scheduler_worker_thread_id] end def set_current_worker_thread_id! - Thread.current[:mini_scheduler_worker_thread_id] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}" + Thread.current[ + :mini_scheduler_worker_thread_id + ] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}" end def worker_thread_ids @threads.filter(&:alive?).filter_map { |t| t[:mini_scheduler_worker_thread_id] } end def process_queue klass = @queue.deq + # hack alert, I need to both deq and set @running atomically. @running = true return if !klass @@ -108,44 +122,51 @@ info.prev_result = "RUNNING" info.current_owner = current_worker_thread_id @mutex.synchronize { info.write! } if @manager.enable_stats - stat = MiniScheduler::Stat.create!( - name: klass.to_s, - hostname: hostname, - pid: Process.pid, - started_at: Time.now, - live_slots_start: GC.stat[:heap_live_slots] - ) + stat = + MiniScheduler::Stat.create!( + name: klass.to_s, + hostname: hostname, + pid: Process.pid, + started_at: Time.now, + live_slots_start: GC.stat[:heap_live_slots], + ) end klass.new.perform rescue => e - MiniScheduler.handle_job_exception(e, message: "Error while running a scheduled job", job: { "class" => klass }) + MiniScheduler.handle_job_exception( + e, + message: "Error while running a scheduled job", + job: { + "class" => klass, + }, + ) error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}" failed = true end + duration = ((Time.now.to_f - start) * 1000).to_i info.prev_duration = duration info.prev_result = failed ? "FAILED" : "OK" info.current_owner = nil if stat stat.update!( duration_ms: duration, live_slots_finish: GC.stat[:heap_live_slots], success: !failed, - error: error + error: error, ) MiniScheduler.job_ran&.call(stat) end - attempts(3) do - @mutex.synchronize { info.write! } - end + attempts(3) { @mutex.synchronize { info.write! } } ensure @running = false + if defined?(ActiveRecord::Base) ActiveRecord::Base.connection_handler.clear_active_connections! end end @@ -161,14 +182,15 @@ @keep_alive_thread.join @recovery_thread.join enq(nil) - kill_thread = Thread.new do - sleep 0.5 - @threads.each(&:kill) - end + kill_thread = + Thread.new do + sleep 0.5 + @threads.each(&:kill) + end @threads.each(&:join) kill_thread.kill kill_thread.join end @@ -177,33 +199,28 @@ def enq(klass) @queue << klass end def wait_till_done - while !@queue.empty? && !(@queue.num_waiting > 0) - sleep 0.001 - end + sleep 0.001 while !@queue.empty? && !(@queue.num_waiting > 0) # this is a hack, but is only used for test anyway # if tests fail that depend on this we are forced to increase it. sleep 0.010 - while @running - sleep 0.001 - end + sleep 0.001 while @running end def attempts(max_attempts) attempt = 0 begin yield - rescue + rescue StandardError attempt += 1 raise if attempt >= max_attempts sleep Random.rand retry end end - end def self.without_runner self.new(skip_runner: true) end @@ -231,15 +248,11 @@ def self.current @current ||= {} end def hostname - @hostname ||= begin - `hostname`.strip - rescue - "unknown" - end + @hostname ||= self.class.hostname end def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end @@ -247,54 +260,56 @@ def next_run(klass) schedule_info(klass).next_run end def ensure_schedule!(klass) - lock do - schedule_info(klass).schedule! - end + lock { schedule_info(klass).schedule! } end def remove(klass) - lock do - schedule_info(klass).del! - end + lock { schedule_info(klass).del! } end def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end def reschedule_orphans_on!(hostname = nil) - redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key| - klass = get_klass(key) - next unless klass - info = schedule_info(klass) + redis + .zrange(Manager.queue_key(queue, hostname), 0, -1) + .each do |key| + klass = get_klass(key) + next unless klass + info = schedule_info(klass) - if ['QUEUED', 'RUNNING'].include?(info.prev_result) && - (info.current_owner.blank? || !redis.get(info.current_owner)) - info.prev_result = 'ORPHAN' - info.next_run = Time.now.to_i - info.write! + if %w[QUEUED RUNNING].include?(info.prev_result) && + (!info.current_owner || !redis.get(info.current_owner)) + info.prev_result = "ORPHAN" + info.next_run = Time.now.to_i + info.write! + end end - end end def get_klass(name) name.constantize rescue NameError nil end def repair_queue - return if redis.exists?(self.class.queue_key(queue)) || - redis.exists?(self.class.queue_key(queue, hostname)) + if redis.exists?(self.class.queue_key(queue)) || + redis.exists?(self.class.queue_key(queue, hostname)) + return + end - self.class.discover_schedules + self + .class + .discover_schedules .select { |schedule| schedule.queue == queue } .each { |schedule| ensure_schedule!(schedule) } end def tick @@ -308,13 +323,11 @@ (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) - if !klass || ( - (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host) - ) + if !klass || ((klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)) # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(queue, hostname), key return end @@ -343,23 +356,23 @@ 60 end def keep_alive(*ids) ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0 - ids.each do |identity_key| - redis.setex identity_key, keep_alive_duration, "" - end + ids.each { |identity_key| redis.setex identity_key, keep_alive_duration, "" } end def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end def self.discover_queues - ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set + queues = Set.new + ObjectSpace.each_object(MiniScheduler::Schedule).each { |schedule| queues << schedule.queue } + queues end def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in @@ -374,10 +387,77 @@ end end schedules end + def self.hostname + @hostname ||= + begin + require "socket" + Socket.gethostname + rescue => e + begin + `hostname`.strip + rescue => e + "unknown_host" + end + end + end + + # Discover running scheduled jobs on the current host. + # + # @example + # + # MiniScheduler::Manager.discover_running_scheduled_jobs + # + # @return [Array<Hash>] an array of hashes representing the running scheduled jobs. + # @option job [Class] :class The class of the scheduled job. + # @option job [Time] :started_at The time when the scheduled job started. + # @option job [String] :thread_id The ID of the worker thread running the job. + # The thread can be identified by matching the `:mini_scheduler_worker_thread_id` thread variable with the ID. + def self.discover_running_scheduled_jobs + hostname = self.hostname + + schedule_keys = + discover_schedules.reduce({}) do |acc, klass| + acc[klass] = if klass.is_per_host + self.schedule_key(klass, hostname) + else + self.schedule_key(klass) + end + + acc + end + + running_scheduled_jobs = [] + + schedule_keys + .keys + .zip(MiniScheduler.redis.mget(*schedule_keys.values)) + .each do |scheduled_job_class, scheduled_job_info| + next if scheduled_job_info.nil? + + parsed = + begin + JSON.parse(scheduled_job_info, symbolize_names: true) + rescue JSON::ParserError + nil + end + + next if parsed.nil? + next if parsed[:prev_result] != "RUNNING" + + running_scheduled_jobs << { + class: scheduled_job_class, + started_at: Time.at(parsed[:prev_run]), + thread_id: parsed[:current_owner], + } + end + + running_scheduled_jobs + end + @class_mutex = Mutex.new def self.seq @class_mutex.synchronize do @i ||= 0 @i += 1 @@ -386,30 +466,23 @@ @@identity_key_mutex = Mutex.new def identity_key return @identity_key if @identity_key @@identity_key_mutex.synchronize do - @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" + @identity_key ||= + "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end end def self.lock_key(queue) "_scheduler_lock_#{queue}_" end def self.queue_key(queue, hostname = nil) - if hostname - "_scheduler_queue_#{queue}_#{hostname}_" - else - "_scheduler_queue_#{queue}_" - end + hostname ? "_scheduler_queue_#{queue}_#{hostname}_" : "_scheduler_queue_#{queue}_" end def self.schedule_key(klass, hostname = nil) - if hostname - "_scheduler_#{klass}_#{hostname}" - else - "_scheduler_#{klass}" - end + hostname ? "_scheduler_#{klass}_#{hostname}" : "_scheduler_#{klass}" end end end