lib/mini_scheduler/manager.rb in mini_scheduler-0.16.0 vs lib/mini_scheduler/manager.rb in mini_scheduler-0.17.0
- old
+ new
@@ -10,29 +10,31 @@
@mutex = Mutex.new
@queue = Queue.new
@manager = manager
@hostname = manager.hostname
- @recovery_thread = Thread.new do
- while !@stopped
- sleep 60
+ @recovery_thread =
+ Thread.new do
+ while !@stopped
+ sleep 60
- @mutex.synchronize do
- repair_queue
- reschedule_orphans
- ensure_worker_threads
+ @mutex.synchronize do
+ repair_queue
+ reschedule_orphans
+ ensure_worker_threads
+ end
end
end
- end
- @keep_alive_thread = Thread.new do
- while !@stopped
- @mutex.synchronize do
- keep_alive
+
+ @keep_alive_thread =
+ Thread.new do
+ while !@stopped
+ @mutex.synchronize { keep_alive }
+ sleep(@manager.keep_alive_duration / 2)
end
- sleep (@manager.keep_alive_duration / 2)
end
- end
+
ensure_worker_threads
end
def keep_alive(*ids)
@manager.keep_alive(*ids)
@@ -47,31 +49,40 @@
end
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler reschedule_orphans")
+ MiniScheduler.handle_job_exception(
+ ex,
+ message: "Error during MiniScheduler reschedule_orphans",
+ )
end
def ensure_worker_threads
@threads ||= []
@threads.delete_if { |t| !t.alive? }
- (@manager.workers - @threads.size).times do
- @threads << Thread.new { worker_loop }
- end
+ (@manager.workers - @threads.size).times { @threads << Thread.new { worker_loop } }
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler ensure_worker_threads")
+ MiniScheduler.handle_job_exception(
+ ex,
+ message: "Error during MiniScheduler ensure_worker_threads",
+ )
end
def worker_loop
set_current_worker_thread_id!
keep_alive(current_worker_thread_id)
+
while !@stopped
begin
process_queue
rescue => ex
- MiniScheduler.handle_job_exception(ex, message: "Error during MiniScheduler worker_loop")
+ MiniScheduler.handle_job_exception(
+ ex,
+ message: "Error during MiniScheduler worker_loop",
+ )
+
break # Data could be in a bad state - stop the thread
end
end
end
@@ -82,19 +93,22 @@
def current_worker_thread_id
Thread.current[:mini_scheduler_worker_thread_id]
end
def set_current_worker_thread_id!
- Thread.current[:mini_scheduler_worker_thread_id] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}"
+ Thread.current[
+ :mini_scheduler_worker_thread_id
+ ] = "#{@manager.identity_key}:thread_#{SecureRandom.alphanumeric(10)}"
end
def worker_thread_ids
@threads.filter(&:alive?).filter_map { |t| t[:mini_scheduler_worker_thread_id] }
end
def process_queue
klass = @queue.deq
+
# hack alert, I need to both deq and set @running atomically.
@running = true
return if !klass
@@ -108,44 +122,51 @@
info.prev_result = "RUNNING"
info.current_owner = current_worker_thread_id
@mutex.synchronize { info.write! }
if @manager.enable_stats
- stat = MiniScheduler::Stat.create!(
- name: klass.to_s,
- hostname: hostname,
- pid: Process.pid,
- started_at: Time.now,
- live_slots_start: GC.stat[:heap_live_slots]
- )
+ stat =
+ MiniScheduler::Stat.create!(
+ name: klass.to_s,
+ hostname: hostname,
+ pid: Process.pid,
+ started_at: Time.now,
+ live_slots_start: GC.stat[:heap_live_slots],
+ )
end
klass.new.perform
rescue => e
- MiniScheduler.handle_job_exception(e, message: "Error while running a scheduled job", job: { "class" => klass })
+ MiniScheduler.handle_job_exception(
+ e,
+ message: "Error while running a scheduled job",
+ job: {
+ "class" => klass,
+ },
+ )
error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
failed = true
end
+
duration = ((Time.now.to_f - start) * 1000).to_i
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.current_owner = nil
if stat
stat.update!(
duration_ms: duration,
live_slots_finish: GC.stat[:heap_live_slots],
success: !failed,
- error: error
+ error: error,
)
MiniScheduler.job_ran&.call(stat)
end
- attempts(3) do
- @mutex.synchronize { info.write! }
- end
+ attempts(3) { @mutex.synchronize { info.write! } }
ensure
@running = false
+
if defined?(ActiveRecord::Base)
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
@@ -161,14 +182,15 @@
@keep_alive_thread.join
@recovery_thread.join
enq(nil)
- kill_thread = Thread.new do
- sleep 0.5
- @threads.each(&:kill)
- end
+ kill_thread =
+ Thread.new do
+ sleep 0.5
+ @threads.each(&:kill)
+ end
@threads.each(&:join)
kill_thread.kill
kill_thread.join
end
@@ -177,33 +199,28 @@
def enq(klass)
@queue << klass
end
def wait_till_done
- while !@queue.empty? && !(@queue.num_waiting > 0)
- sleep 0.001
- end
+ sleep 0.001 while !@queue.empty? && !(@queue.num_waiting > 0)
# this is a hack, but is only used for test anyway
# if tests fail that depend on this we are forced to increase it.
sleep 0.010
- while @running
- sleep 0.001
- end
+ sleep 0.001 while @running
end
def attempts(max_attempts)
attempt = 0
begin
yield
- rescue
+ rescue StandardError
attempt += 1
raise if attempt >= max_attempts
sleep Random.rand
retry
end
end
-
end
def self.without_runner
self.new(skip_runner: true)
end
@@ -231,15 +248,11 @@
def self.current
@current ||= {}
end
def hostname
- @hostname ||= begin
- `hostname`.strip
- rescue
- "unknown"
- end
+ @hostname ||= self.class.hostname
end
def schedule_info(klass)
MiniScheduler::ScheduleInfo.new(klass, self)
end
@@ -247,54 +260,56 @@
def next_run(klass)
schedule_info(klass).next_run
end
def ensure_schedule!(klass)
- lock do
- schedule_info(klass).schedule!
- end
+ lock { schedule_info(klass).schedule! }
end
def remove(klass)
- lock do
- schedule_info(klass).del!
- end
+ lock { schedule_info(klass).del! }
end
def reschedule_orphans!
lock do
reschedule_orphans_on!
reschedule_orphans_on!(hostname)
end
end
def reschedule_orphans_on!(hostname = nil)
- redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
- klass = get_klass(key)
- next unless klass
- info = schedule_info(klass)
+ redis
+ .zrange(Manager.queue_key(queue, hostname), 0, -1)
+ .each do |key|
+ klass = get_klass(key)
+ next unless klass
+ info = schedule_info(klass)
- if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
- (info.current_owner.blank? || !redis.get(info.current_owner))
- info.prev_result = 'ORPHAN'
- info.next_run = Time.now.to_i
- info.write!
+ if %w[QUEUED RUNNING].include?(info.prev_result) &&
+ (!info.current_owner || !redis.get(info.current_owner))
+ info.prev_result = "ORPHAN"
+ info.next_run = Time.now.to_i
+ info.write!
+ end
end
- end
end
def get_klass(name)
name.constantize
rescue NameError
nil
end
def repair_queue
- return if redis.exists?(self.class.queue_key(queue)) ||
- redis.exists?(self.class.queue_key(queue, hostname))
+ if redis.exists?(self.class.queue_key(queue)) ||
+ redis.exists?(self.class.queue_key(queue, hostname))
+ return
+ end
- self.class.discover_schedules
+ self
+ .class
+ .discover_schedules
.select { |schedule| schedule.queue == queue }
.each { |schedule| ensure_schedule!(schedule) }
end
def tick
@@ -308,13 +323,11 @@
(key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = get_klass(key)
- if !klass || (
- (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)
- )
+ if !klass || ((klass.is_per_host && !hostname) || (hostname && !klass.is_per_host))
# corrupt key, nuke it (renamed job or something)
redis.zrem Manager.queue_key(queue, hostname), key
return
end
@@ -343,23 +356,23 @@
60
end
def keep_alive(*ids)
ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0
- ids.each do |identity_key|
- redis.setex identity_key, keep_alive_duration, ""
- end
+ ids.each { |identity_key| redis.setex identity_key, keep_alive_duration, "" }
end
def lock
MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
yield
end
end
def self.discover_queues
- ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
+ queues = Set.new
+ ObjectSpace.each_object(MiniScheduler::Schedule).each { |schedule| queues << schedule.queue }
+ queues
end
def self.discover_schedules
# hack for developemnt reloader is crazytown
# multiple classes with same name can be in
@@ -374,10 +387,77 @@
end
end
schedules
end
+ def self.hostname
+ @hostname ||=
+ begin
+ require "socket"
+ Socket.gethostname
+ rescue => e
+ begin
+ `hostname`.strip
+ rescue => e
+ "unknown_host"
+ end
+ end
+ end
+
+ # Discover running scheduled jobs on the current host.
+ #
+ # @example
+ #
+ # MiniScheduler::Manager.discover_running_scheduled_jobs
+ #
+ # @return [Array<Hash>] an array of hashes representing the running scheduled jobs.
+ # @option job [Class] :class The class of the scheduled job.
+ # @option job [Time] :started_at The time when the scheduled job started.
+ # @option job [String] :thread_id The ID of the worker thread running the job.
+ # The thread can be identified by matching the `:mini_scheduler_worker_thread_id` thread variable with the ID.
+ def self.discover_running_scheduled_jobs
+ hostname = self.hostname
+
+ schedule_keys =
+ discover_schedules.reduce({}) do |acc, klass|
+ acc[klass] = if klass.is_per_host
+ self.schedule_key(klass, hostname)
+ else
+ self.schedule_key(klass)
+ end
+
+ acc
+ end
+
+ running_scheduled_jobs = []
+
+ schedule_keys
+ .keys
+ .zip(MiniScheduler.redis.mget(*schedule_keys.values))
+ .each do |scheduled_job_class, scheduled_job_info|
+ next if scheduled_job_info.nil?
+
+ parsed =
+ begin
+ JSON.parse(scheduled_job_info, symbolize_names: true)
+ rescue JSON::ParserError
+ nil
+ end
+
+ next if parsed.nil?
+ next if parsed[:prev_result] != "RUNNING"
+
+ running_scheduled_jobs << {
+ class: scheduled_job_class,
+ started_at: Time.at(parsed[:prev_run]),
+ thread_id: parsed[:current_owner],
+ }
+ end
+
+ running_scheduled_jobs
+ end
+
@class_mutex = Mutex.new
def self.seq
@class_mutex.synchronize do
@i ||= 0
@i += 1
@@ -386,30 +466,23 @@
@@identity_key_mutex = Mutex.new
def identity_key
return @identity_key if @identity_key
@@identity_key_mutex.synchronize do
- @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
+ @identity_key ||=
+ "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
end
def self.lock_key(queue)
"_scheduler_lock_#{queue}_"
end
def self.queue_key(queue, hostname = nil)
- if hostname
- "_scheduler_queue_#{queue}_#{hostname}_"
- else
- "_scheduler_queue_#{queue}_"
- end
+ hostname ? "_scheduler_queue_#{queue}_#{hostname}_" : "_scheduler_queue_#{queue}_"
end
def self.schedule_key(klass, hostname = nil)
- if hostname
- "_scheduler_#{klass}_#{hostname}"
- else
- "_scheduler_#{klass}"
- end
+ hostname ? "_scheduler_#{klass}_#{hostname}" : "_scheduler_#{klass}"
end
end
end