lib/rufus/scheduler.rb in rufus-scheduler-3.6.0 vs lib/rufus/scheduler.rb in rufus-scheduler-3.7.0
- old
+ new
@@ -1,629 +1,722 @@
-require 'set'
require 'date' if RUBY_VERSION < '1.9.0'
-require 'time'
require 'thread'
require 'fugit'
-module Rufus
+module Rufus; end
- class Scheduler
+class Rufus::Scheduler
- VERSION = '3.6.0'
+ VERSION = '3.7.0'
- EoTime = ::EtOrbi::EoTime
+ EoTime = ::EtOrbi::EoTime
- require 'rufus/scheduler/util'
- require 'rufus/scheduler/jobs'
- require 'rufus/scheduler/job_array'
- require 'rufus/scheduler/locks'
+ require 'rufus/scheduler/util'
+ require 'rufus/scheduler/jobs_core'
+ require 'rufus/scheduler/jobs_one_time'
+ require 'rufus/scheduler/jobs_repeat'
+ require 'rufus/scheduler/job_array'
+ require 'rufus/scheduler/locks'
- #
- # A common error class for rufus-scheduler
- #
- class Error < StandardError; end
+ #
+ # A common error class for rufus-scheduler
+ #
+ class Error < StandardError; end
- #
- # This error is thrown when the :timeout attribute triggers
- #
- class TimeoutError < Error; end
+ #
+ # This error is thrown when the :timeout attribute triggers
+ #
+ class TimeoutError < Error; end
- #
- # For when the scheduler is not running
- # (it got shut down or didn't start because of a lock)
- #
- class NotRunningError < Error; end
+ #
+ # For when the scheduler is not running
+ # (it got shut down or didn't start because of a lock)
+ #
+ class NotRunningError < Error; end
- #MIN_WORK_THREADS = 3
- MAX_WORK_THREADS = 28
+ #MIN_WORK_THREADS = 3
+ MAX_WORK_THREADS = 28
- attr_accessor :frequency
- attr_reader :started_at
- attr_reader :thread
- attr_reader :thread_key
- attr_reader :mutexes
+ attr_accessor :frequency
+ attr_accessor :discard_past
- #attr_accessor :min_work_threads
- attr_accessor :max_work_threads
+ attr_reader :started_at
+ attr_reader :paused_at
+ attr_reader :thread
+ attr_reader :thread_key
+ attr_reader :mutexes
- attr_accessor :stderr
+ #attr_accessor :min_work_threads
+ attr_accessor :max_work_threads
- attr_reader :work_queue
+ attr_accessor :stderr
- def initialize(opts={})
+ attr_reader :work_queue
- @opts = opts
+ def initialize(opts={})
- @started_at = nil
- @paused = false
+ @opts = opts
- @jobs = JobArray.new
+ @started_at = nil
+ @paused_at = nil
- @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300)
- @mutexes = {}
+ @jobs = JobArray.new
- @work_queue = Queue.new
+ @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300)
+ @discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true
- #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS
- @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS
+ @mutexes = {}
- @stderr = $stderr
+ @work_queue = Queue.new
+ @join_queue = Queue.new
- @thread_key = "rufus_scheduler_#{self.object_id}"
+ #@min_work_threads =
+ # opts[:min_work_threads] || opts[:min_worker_threads] ||
+ # MIN_WORK_THREADS
+ @max_work_threads =
+ opts[:max_work_threads] || opts[:max_worker_threads] ||
+ MAX_WORK_THREADS
- @scheduler_lock =
- if lockfile = opts[:lockfile]
- Rufus::Scheduler::FileLock.new(lockfile)
- else
- opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new
- end
+ @stderr = $stderr
- @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new
+ @thread_key = "rufus_scheduler_#{self.object_id}"
- # If we can't grab the @scheduler_lock, don't run.
- lock || return
+ @scheduler_lock =
+ if lockfile = opts[:lockfile]
+ Rufus::Scheduler::FileLock.new(lockfile)
+ else
+ opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new
+ end
- start
- end
+ @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new
- # Returns a singleton Rufus::Scheduler instance
- #
- def self.singleton(opts={})
+ # If we can't grab the @scheduler_lock, don't run.
+ lock || return
- @singleton ||= Rufus::Scheduler.new(opts)
- end
+ start
+ end
- # Alias for Rufus::Scheduler.singleton
- #
- def self.s(opts={}); singleton(opts); end
+ # Returns a singleton Rufus::Scheduler instance
+ #
+ def self.singleton(opts={})
- # Releasing the gem would probably require redirecting .start_new to
- # .new and emit a simple deprecation message.
- #
- # For now, let's assume the people pointing at rufus-scheduler/master
- # on GitHub know what they do...
- #
- def self.start_new
+ @singleton ||= Rufus::Scheduler.new(opts)
+ end
- fail "this is rufus-scheduler 3.x, use .new instead of .start_new"
- end
+ # Alias for Rufus::Scheduler.singleton
+ #
+ def self.s(opts={}); singleton(opts); end
- def shutdown(opt=nil)
+ # Releasing the gem would probably require redirecting .start_new to
+ # .new and emit a simple deprecation message.
+ #
+ # For now, let's assume the people pointing at rufus-scheduler/master
+ # on GitHub know what they do...
+ #
+ def self.start_new
- @started_at = nil
+ fail 'this is rufus-scheduler 3.x, use .new instead of .start_new'
+ end
- #jobs.each { |j| j.unschedule }
- #
- # which provokes https://github.com/jmettraux/rufus-scheduler/issues/98
- # using the following instead:
- #
- @jobs.array.each { |j| j.unschedule }
+ def uptime
- @work_queue.clear
+ @started_at ? EoTime.now - @started_at : nil
+ end
- if opt == :wait
- join_all_work_threads
- elsif opt == :kill
- kill_all_work_threads
- end
+ def around_trigger(job)
- unlock
- end
+ yield
+ end
- alias stop shutdown
+ def uptime_s
- def uptime
+ uptime ? self.class.to_duration(uptime) : ''
+ end
- @started_at ? EoTime.now - @started_at : nil
- end
+ def join(time_limit=nil)
- def uptime_s
+ fail NotRunningError.new('cannot join scheduler that is not running') \
+ unless @thread
+ fail ThreadError.new('scheduler thread cannot join itself') \
+ if @thread == Thread.current
- uptime ? self.class.to_duration(uptime) : ''
+ if time_limit
+ time_limit_join(time_limit)
+ else
+ no_time_limit_join
end
+ end
- def join
+ def down?
- fail NotRunningError.new(
- 'cannot join scheduler that is not running'
- ) unless @thread
+ ! @started_at
+ end
- @thread.join
- end
+ def up?
- def down?
+ !! @started_at
+ end
- ! @started_at
- end
+ def paused?
- def up?
+ !! @paused_at
+ end
- !! @started_at
- end
+ def pause
- def paused?
+ @paused_at = EoTime.now
+ end
- @paused
- end
+ def resume(opts={})
- def pause
+ dp = opts[:discard_past]
+ jobs.each { |job| job.resume_discard_past = dp }
- @paused = true
- end
+ @paused_at = nil
+ end
- def resume
+ #--
+ # scheduling methods
+ #++
- @paused = false
- end
+ def at(time, callable=nil, opts={}, &block)
- #--
- # scheduling methods
- #++
+ do_schedule(:once, time, callable, opts, opts[:job], block)
+ end
- def at(time, callable=nil, opts={}, &block)
+ def schedule_at(time, callable=nil, opts={}, &block)
- do_schedule(:once, time, callable, opts, opts[:job], block)
- end
+ do_schedule(:once, time, callable, opts, true, block)
+ end
- def schedule_at(time, callable=nil, opts={}, &block)
+ def in(duration, callable=nil, opts={}, &block)
- do_schedule(:once, time, callable, opts, true, block)
- end
+ do_schedule(:once, duration, callable, opts, opts[:job], block)
+ end
- def in(duration, callable=nil, opts={}, &block)
+ def schedule_in(duration, callable=nil, opts={}, &block)
- do_schedule(:once, duration, callable, opts, opts[:job], block)
- end
+ do_schedule(:once, duration, callable, opts, true, block)
+ end
- def schedule_in(duration, callable=nil, opts={}, &block)
+ def every(duration, callable=nil, opts={}, &block)
- do_schedule(:once, duration, callable, opts, true, block)
- end
+ do_schedule(:every, duration, callable, opts, opts[:job], block)
+ end
- def every(duration, callable=nil, opts={}, &block)
+ def schedule_every(duration, callable=nil, opts={}, &block)
- do_schedule(:every, duration, callable, opts, opts[:job], block)
- end
+ do_schedule(:every, duration, callable, opts, true, block)
+ end
- def schedule_every(duration, callable=nil, opts={}, &block)
+ def interval(duration, callable=nil, opts={}, &block)
- do_schedule(:every, duration, callable, opts, true, block)
- end
+ do_schedule(:interval, duration, callable, opts, opts[:job], block)
+ end
- def interval(duration, callable=nil, opts={}, &block)
+ def schedule_interval(duration, callable=nil, opts={}, &block)
- do_schedule(:interval, duration, callable, opts, opts[:job], block)
- end
+ do_schedule(:interval, duration, callable, opts, true, block)
+ end
- def schedule_interval(duration, callable=nil, opts={}, &block)
+ def cron(cronline, callable=nil, opts={}, &block)
- do_schedule(:interval, duration, callable, opts, true, block)
- end
+ do_schedule(:cron, cronline, callable, opts, opts[:job], block)
+ end
- def cron(cronline, callable=nil, opts={}, &block)
+ def schedule_cron(cronline, callable=nil, opts={}, &block)
- do_schedule(:cron, cronline, callable, opts, opts[:job], block)
- end
+ do_schedule(:cron, cronline, callable, opts, true, block)
+ end
- def schedule_cron(cronline, callable=nil, opts={}, &block)
+ def schedule(arg, callable=nil, opts={}, &block)
- do_schedule(:cron, cronline, callable, opts, true, block)
+ callable, opts = nil, callable if callable.is_a?(Hash)
+ opts = opts.dup
+
+ opts[:_t] = Rufus::Scheduler.parse(arg, opts)
+
+ case opts[:_t]
+ when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
+ when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block)
+ else schedule_in(arg, callable, opts, &block)
end
+ end
- def schedule(arg, callable=nil, opts={}, &block)
+ def repeat(arg, callable=nil, opts={}, &block)
- callable, opts = nil, callable if callable.is_a?(Hash)
- opts = opts.dup
+ callable, opts = nil, callable if callable.is_a?(Hash)
+ opts = opts.dup
- opts[:_t] = Scheduler.parse(arg, opts)
+ opts[:_t] = Rufus::Scheduler.parse(arg, opts)
- case opts[:_t]
- when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
- when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block)
- else schedule_in(arg, callable, opts, &block)
- end
+ case opts[:_t]
+ when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
+ else schedule_every(arg, callable, opts, &block)
end
+ end
- def repeat(arg, callable=nil, opts={}, &block)
+ def unschedule(job_or_job_id)
- callable, opts = nil, callable if callable.is_a?(Hash)
- opts = opts.dup
+ job, job_id = fetch(job_or_job_id)
- opts[:_t] = Scheduler.parse(arg, opts)
+ fail ArgumentError.new("no job found with id '#{job_id}'") unless job
- case opts[:_t]
- when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block)
- else schedule_every(arg, callable, opts, &block)
- end
- end
+ job.unschedule if job
+ end
- def unschedule(job_or_job_id)
+ #--
+ # jobs methods
+ #++
- job, job_id = fetch(job_or_job_id)
+ # Returns all the scheduled jobs
+ # (even those right before re-schedule).
+ #
+ def jobs(opts={})
- fail ArgumentError.new("no job found with id '#{job_id}'") unless job
+ opts = { opts => true } if opts.is_a?(Symbol)
- job.unschedule if job
+ jobs = @jobs.to_a
+
+ if opts[:running]
+ jobs = jobs.select { |j| j.running? }
+ elsif ! opts[:all]
+ jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at }
end
- #--
- # jobs methods
- #++
+ tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s)
+ jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } }
- # Returns all the scheduled jobs
- # (even those right before re-schedule).
- #
- def jobs(opts={})
+ jobs
+ end
- opts = { opts => true } if opts.is_a?(Symbol)
+ def at_jobs(opts={})
- jobs = @jobs.to_a
+ jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) }
+ end
- if opts[:running]
- jobs = jobs.select { |j| j.running? }
- elsif ! opts[:all]
- jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at }
- end
+ def in_jobs(opts={})
- tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s)
- jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } }
+ jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) }
+ end
- jobs
- end
+ def every_jobs(opts={})
- def at_jobs(opts={})
+ jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) }
+ end
- jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) }
- end
+ def interval_jobs(opts={})
- def in_jobs(opts={})
+ jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) }
+ end
- jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) }
- end
+ def cron_jobs(opts={})
- def every_jobs(opts={})
+ jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) }
+ end
- jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) }
- end
+ def job(job_id)
- def interval_jobs(opts={})
+ @jobs[job_id]
+ end
- jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) }
- end
+ # Returns true if the scheduler has acquired the [exclusive] lock and
+ # thus may run.
+ #
+ # Most of the time, a scheduler is run alone and this method should
+ # return true. It is useful in cases where among a group of applications
+ # only one of them should run the scheduler. For schedulers that should
+ # not run, the method should return false.
+ #
+ # Out of the box, rufus-scheduler proposes the
+ # :lockfile => 'path/to/lock/file' scheduler start option. It makes
+ # it easy for schedulers on the same machine to determine which should
+ # run (the first to write the lockfile and lock it). It uses "man 2 flock"
+ # so it probably won't work reliably on distributed file systems.
+ #
+ # If one needs to use a special/different locking mechanism, the scheduler
+ # accepts :scheduler_lock => lock_object. lock_object only needs to respond
+ # to #lock
+ # and #unlock, and both of these methods should be idempotent.
+ #
+ # Look at rufus/scheduler/locks.rb for an example.
+ #
+ def lock
- def cron_jobs(opts={})
+ @scheduler_lock.lock
+ end
- jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) }
- end
+ # Sister method to #lock, is called when the scheduler shuts down.
+ #
+ def unlock
- def job(job_id)
+ @trigger_lock.unlock
+ @scheduler_lock.unlock
+ end
- @jobs[job_id]
- end
+ # Callback called when a job is triggered. If the lock cannot be acquired,
+ # the job won't run (though it'll still be scheduled to run again if
+ # necessary).
+ #
+ def confirm_lock
- # Returns true if the scheduler has acquired the [exclusive] lock and
- # thus may run.
- #
- # Most of the time, a scheduler is run alone and this method should
- # return true. It is useful in cases where among a group of applications
- # only one of them should run the scheduler. For schedulers that should
- # not run, the method should return false.
- #
- # Out of the box, rufus-scheduler proposes the
- # :lockfile => 'path/to/lock/file' scheduler start option. It makes
- # it easy for schedulers on the same machine to determine which should
- # run (the first to write the lockfile and lock it). It uses "man 2 flock"
- # so it probably won't work reliably on distributed file systems.
- #
- # If one needs to use a special/different locking mechanism, the scheduler
- # accepts :scheduler_lock => lock_object. lock_object only needs to respond
- # to #lock
- # and #unlock, and both of these methods should be idempotent.
- #
- # Look at rufus/scheduler/locks.rb for an example.
- #
- def lock
+ @trigger_lock.lock
+ end
- @scheduler_lock.lock
- end
+ # Returns true if this job is currently scheduled.
+ #
+ # Takes extra care to answer true if the job is a repeat job
+ # currently firing.
+ #
+ def scheduled?(job_or_job_id)
- # Sister method to #lock, is called when the scheduler shuts down.
- #
- def unlock
+ job, _ = fetch(job_or_job_id)
- @trigger_lock.unlock
- @scheduler_lock.unlock
- end
+ !! (job && job.unscheduled_at.nil? && job.next_time != nil)
+ end
- # Callback called when a job is triggered. If the lock cannot be acquired,
- # the job won't run (though it'll still be scheduled to run again if
- # necessary).
- #
- def confirm_lock
+ # Lists all the threads associated with this scheduler.
+ #
+ def threads
- @trigger_lock.lock
- end
+ Thread.list.select { |t| t[thread_key] }
+ end
- # Returns true if this job is currently scheduled.
- #
- # Takes extra care to answer true if the job is a repeat job
- # currently firing.
- #
- def scheduled?(job_or_job_id)
+ # Lists all the work threads (the ones actually running the scheduled
+ # block code)
+ #
+ # Accepts a query option, which can be set to:
+ # * :all (default), returns all the threads that are work threads
+ # or are currently running a job
+ # * :active, returns all threads that are currently running a job
+ # * :vacant, returns the threads that are not running a job
+ #
+ # If, thanks to :blocking => true, a job is scheduled to monopolize the
+ # main scheduler thread, that thread will get returned when :active or
+ # :all.
+ #
+ def work_threads(query=:all)
- job, _ = fetch(job_or_job_id)
+ ts = threads.select { |t| t[:rufus_scheduler_work_thread] }
- !! (job && job.unscheduled_at.nil? && job.next_time != nil)
+ case query
+ when :active then ts.select { |t| t[:rufus_scheduler_job] }
+ when :vacant then ts.reject { |t| t[:rufus_scheduler_job] }
+ else ts
end
+ end
- # Lists all the threads associated with this scheduler.
- #
- def threads
+ def running_jobs(opts={})
- Thread.list.select { |t| t[thread_key] }
- end
+ jobs(opts.merge(:running => true))
+ end
- # Lists all the work threads (the ones actually running the scheduled
- # block code)
- #
- # Accepts a query option, which can be set to:
- # * :all (default), returns all the threads that are work threads
- # or are currently running a job
- # * :active, returns all threads that are currently running a job
- # * :vacant, returns the threads that are not running a job
- #
- # If, thanks to :blocking => true, a job is scheduled to monopolize the
- # main scheduler thread, that thread will get returned when :active or
- # :all.
- #
- def work_threads(query=:all)
+ def occurrences(time0, time1, format=:per_job)
- ts = threads.select { |t| t[:rufus_scheduler_work_thread] }
+ h = {}
- case query
- when :active then ts.select { |t| t[:rufus_scheduler_job] }
- when :vacant then ts.reject { |t| t[:rufus_scheduler_job] }
- else ts
- end
+ jobs.each do |j|
+ os = j.occurrences(time0, time1)
+ h[j] = os if os.any?
end
- def running_jobs(opts={})
+ if format == :timeline
+ a = []
+ h.each { |j, ts| ts.each { |t| a << [ t, j ] } }
+ a.sort_by { |(t, _)| t }
+ else
+ h
+ end
+ end
- jobs(opts.merge(:running => true))
+ def timeline(time0, time1)
+
+ occurrences(time0, time1, :timeline)
+ end
+
+ def on_error(job, err)
+
+ pre = err.object_id.to_s
+
+ ms = {}; mutexes.each { |k, v| ms[k] = v.locked? }
+
+ stderr.puts("{ #{pre} rufus-scheduler intercepted an error:")
+ stderr.puts(" #{pre} job:")
+ stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}")
+ # TODO: eventually use a Job#detail or something like that
+ stderr.puts(" #{pre} error:")
+ stderr.puts(" #{pre} #{err.object_id}")
+ stderr.puts(" #{pre} #{err.class}")
+ stderr.puts(" #{pre} #{err}")
+ err.backtrace.each do |l|
+ stderr.puts(" #{pre} #{l}")
end
+ stderr.puts(" #{pre} tz:")
+ stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}")
+ stderr.puts(" #{pre} Time.now: #{Time.now}")
+ stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}")
+ stderr.puts(" #{pre} et-orbi:")
+ stderr.puts(" #{pre} #{EoTime.platform_info}")
+ stderr.puts(" #{pre} scheduler:")
+ stderr.puts(" #{pre} object_id: #{object_id}")
+ stderr.puts(" #{pre} opts:")
+ stderr.puts(" #{pre} #{@opts.inspect}")
+ stderr.puts(" #{pre} frequency: #{self.frequency}")
+ stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}")
+ stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}")
+ stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})")
+ stderr.puts(" #{pre} down?: #{down?}")
+ stderr.puts(" #{pre} frequency: #{frequency.inspect}")
+ stderr.puts(" #{pre} discard_past: #{discard_past.inspect}")
+ stderr.puts(" #{pre} started_at: #{started_at.inspect}")
+ stderr.puts(" #{pre} paused_at: #{paused_at.inspect}")
+ stderr.puts(" #{pre} threads: #{self.threads.size}")
+ stderr.puts(" #{pre} thread: #{self.thread}")
+ stderr.puts(" #{pre} thread_key: #{self.thread_key}")
+ stderr.puts(" #{pre} work_threads: #{work_threads.size}")
+ stderr.puts(" #{pre} active: #{work_threads(:active).size}")
+ stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}")
+ stderr.puts(" #{pre} max_work_threads: #{max_work_threads}")
+ stderr.puts(" #{pre} mutexes: #{ms.inspect}")
+ stderr.puts(" #{pre} jobs: #{jobs.size}")
+ stderr.puts(" #{pre} at_jobs: #{at_jobs.size}")
+ stderr.puts(" #{pre} in_jobs: #{in_jobs.size}")
+ stderr.puts(" #{pre} every_jobs: #{every_jobs.size}")
+ stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}")
+ stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}")
+ stderr.puts(" #{pre} running_jobs: #{running_jobs.size}")
+ stderr.puts(" #{pre} work_queue:")
+ stderr.puts(" #{pre} size: #{@work_queue.size}")
+ stderr.puts(" #{pre} num_waiting: #{@work_queue.num_waiting}")
+ stderr.puts(" #{pre} join_queue:")
+ stderr.puts(" #{pre} size: #{@join_queue.size}")
+ stderr.puts(" #{pre} num_waiting: #{@join_queue.num_waiting}")
+ stderr.puts("} #{pre} .")
- def occurrences(time0, time1, format=:per_job)
+ rescue => e
- h = {}
+ stderr.puts("failure in #on_error itself:")
+ stderr.puts(e.inspect)
+ stderr.puts(e.backtrace)
- jobs.each do |j|
- os = j.occurrences(time0, time1)
- h[j] = os if os.any?
- end
+ ensure
- if format == :timeline
- a = []
- h.each { |j, ts| ts.each { |t| a << [ t, j ] } }
- a.sort_by { |(t, _)| t }
- else
- h
+ stderr.flush
+ end
+
+ def shutdown(opt=nil)
+
+ opts =
+ case opt
+ when Symbol then { opt => true }
+ when Hash then opt
+ else {}
end
- end
- def timeline(time0, time1)
+ @jobs.unschedule_all
- occurrences(time0, time1, :timeline)
+ if opts[:wait] || opts[:join]
+ join_shutdown(opts)
+ elsif opts[:kill]
+ kill_shutdown(opts)
+ else
+ regular_shutdown(opts)
end
- def on_error(job, err)
+ @work_queue.clear
- pre = err.object_id.to_s
+ unlock
- ms = {}; mutexes.each { |k, v| ms[k] = v.locked? }
+ @thread.join
+ end
+ alias stop shutdown
- stderr.puts("{ #{pre} rufus-scheduler intercepted an error:")
- stderr.puts(" #{pre} job:")
- stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}")
- # TODO: eventually use a Job#detail or something like that
- stderr.puts(" #{pre} error:")
- stderr.puts(" #{pre} #{err.object_id}")
- stderr.puts(" #{pre} #{err.class}")
- stderr.puts(" #{pre} #{err}")
- err.backtrace.each do |l|
- stderr.puts(" #{pre} #{l}")
- end
- stderr.puts(" #{pre} tz:")
- stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}")
- stderr.puts(" #{pre} Time.now: #{Time.now}")
- stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}")
- stderr.puts(" #{pre} et-orbi:")
- stderr.puts(" #{pre} #{EoTime.platform_info}")
- stderr.puts(" #{pre} scheduler:")
- stderr.puts(" #{pre} object_id: #{object_id}")
- stderr.puts(" #{pre} opts:")
- stderr.puts(" #{pre} #{@opts.inspect}")
- stderr.puts(" #{pre} frequency: #{self.frequency}")
- stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}")
- stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}")
- stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})")
- stderr.puts(" #{pre} down?: #{down?}")
- stderr.puts(" #{pre} threads: #{self.threads.size}")
- stderr.puts(" #{pre} thread: #{self.thread}")
- stderr.puts(" #{pre} thread_key: #{self.thread_key}")
- stderr.puts(" #{pre} work_threads: #{work_threads.size}")
- stderr.puts(" #{pre} active: #{work_threads(:active).size}")
- stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}")
- stderr.puts(" #{pre} max_work_threads: #{max_work_threads}")
- stderr.puts(" #{pre} mutexes: #{ms.inspect}")
- stderr.puts(" #{pre} jobs: #{jobs.size}")
- stderr.puts(" #{pre} at_jobs: #{at_jobs.size}")
- stderr.puts(" #{pre} in_jobs: #{in_jobs.size}")
- stderr.puts(" #{pre} every_jobs: #{every_jobs.size}")
- stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}")
- stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}")
- stderr.puts(" #{pre} running_jobs: #{running_jobs.size}")
- stderr.puts(" #{pre} work_queue: #{work_queue.size}")
- stderr.puts("} #{pre} .")
+ protected
- rescue => e
+ def join_shutdown(opts)
- stderr.puts("failure in #on_error itself:")
- stderr.puts(e.inspect)
- stderr.puts(e.backtrace)
+ limit = opts[:wait] || opts[:join]
+ limit = limit.is_a?(Numeric) ? limit : nil
- ensure
+ #@started_at = nil
+ #
+ # when @started_at is nil, the scheduler thread exits, here
+ # we want it to exit when all the work threads have been joined
+ # hence it's set to nil later on
+ #
+ @paused_at = EoTime.now
- stderr.flush
- end
+ (work_threads.size * 2 + 1).times { @work_queue << :shutdown }
- protected
+ work_threads
+ .collect { |wt|
+ wt == Thread.current ? nil : Thread.new { wt.join(limit); wt.kill } }
+ .each { |st|
+ st.join if st }
- # Returns [ job, job_id ]
- #
- def fetch(job_or_job_id)
+ @started_at = nil
+ end
- if job_or_job_id.respond_to?(:job_id)
- [ job_or_job_id, job_or_job_id.job_id ]
- else
- [ job(job_or_job_id), job_or_job_id ]
- end
- end
+ def kill_shutdown(opts)
- def terminate_all_jobs
+ @started_at = nil
+ work_threads.each(&:kill)
+ end
- jobs.each { |j| j.unschedule }
+ def regular_shutdown(opts)
- sleep 0.01 while running_jobs.size > 0
- end
+ @started_at = nil
+ end
- def join_all_work_threads
+ def time_limit_join(limit)
- work_threads.size.times { @work_queue << :sayonara }
+ fail ArgumentError.new("limit #{limit.inspect} should be > 0") \
+ unless limit.is_a?(Numeric) && limit > 0
- work_threads.each { |t| t.join }
+ t0 = monow
+ f = [ limit.to_f / 20, 0.100 ].min
- @work_queue.clear
+ while monow - t0 < limit
+ r =
+ begin
+ @join_queue.pop(true)
+ rescue ThreadError => e
+ # #<ThreadError: queue empty>
+ false
+ end
+ return r if r
+ sleep(f)
end
- def kill_all_work_threads
+ nil
+ end
- work_threads.each { |t| t.kill }
+ def no_time_limit_join
+
+ @join_queue.pop
+ end
+
+ # Returns [ job, job_id ]
+ #
+ def fetch(job_or_job_id)
+
+ if job_or_job_id.respond_to?(:job_id)
+ [ job_or_job_id, job_or_job_id.job_id ]
+ else
+ [ job(job_or_job_id), job_or_job_id ]
end
+ end
- #def free_all_work_threads
- #
- # work_threads.each { |t| t.raise(KillSignal) }
- #end
+ def terminate_all_jobs
- def start
+ jobs.each { |j| j.unschedule }
- @started_at = EoTime.now
+ sleep 0.01 while running_jobs.size > 0
+ end
- @thread =
- Thread.new do
+ #def free_all_work_threads
+ #
+ # work_threads.each { |t| t.raise(KillSignal) }
+ #end
- while @started_at do
+ def start
- unschedule_jobs
- trigger_jobs unless @paused
- timeout_jobs
+ @started_at = EoTime.now
- sleep(@frequency)
- end
+ @thread =
+ Thread.new do
+
+ while @started_at do
+
+ unschedule_jobs
+ trigger_jobs unless @paused_at
+ timeout_jobs
+
+ sleep(@frequency)
end
- @thread[@thread_key] = true
- @thread[:rufus_scheduler] = self
- @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"
- end
+ rejoin
+ end
- def unschedule_jobs
+ @thread[@thread_key] = true
+ @thread[:rufus_scheduler] = self
+ @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"
+ end
- @jobs.delete_unscheduled
- end
+ def unschedule_jobs
- def trigger_jobs
+ @jobs.delete_unscheduled
+ end
- now = EoTime.now
+ def trigger_jobs
- @jobs.each(now) do |job|
+ now = EoTime.now
- job.trigger(now)
- end
+ @jobs.each(now) do |job|
+
+ job.trigger(now)
end
+ end
- def timeout_jobs
+ def timeout_jobs
- work_threads(:active).each do |t|
+ work_threads(:active).each do |t|
- job = t[:rufus_scheduler_job]
- to = t[:rufus_scheduler_timeout]
- ts = t[:rufus_scheduler_time]
+ job = t[:rufus_scheduler_job]
+ to = t[:rufus_scheduler_timeout]
+ ts = t[:rufus_scheduler_time]
- next unless job && to && ts
- # thread might just have become inactive (job -> nil)
+ next unless job && to && ts
+ # thread might just have become inactive (job -> nil)
- to = ts + to unless to.is_a?(EoTime)
+ to = ts + to unless to.is_a?(EoTime)
- next if to > EoTime.now
+ next if to > EoTime.now
- t.raise(Rufus::Scheduler::TimeoutError)
- end
+ t.raise(Rufus::Scheduler::TimeoutError)
end
+ end
- def do_schedule(job_type, t, callable, opts, return_job_instance, block)
+ def rejoin
- fail NotRunningError.new(
- 'cannot schedule, scheduler is down or shutting down'
- ) if @started_at.nil?
+ (@join_queue.num_waiting * 2 + 1).times { @join_queue << @thread }
+ end
- callable, opts = nil, callable if callable.is_a?(Hash)
- opts = opts.dup unless opts.has_key?(:_t)
+ def do_schedule(job_type, t, callable, opts, return_job_instance, block)
- return_job_instance ||= opts[:job]
+ fail NotRunningError.new(
+ 'cannot schedule, scheduler is down or shutting down'
+ ) if @started_at.nil?
- job_class =
- case job_type
- when :once
- opts[:_t] ||= Rufus::Scheduler.parse(t, opts)
- opts[:_t].is_a?(Numeric) ? InJob : AtJob
- when :every
- EveryJob
- when :interval
- IntervalJob
- when :cron
- CronJob
- end
+ callable, opts = nil, callable if callable.is_a?(Hash)
+ opts = opts.dup unless opts.has_key?(:_t)
- job = job_class.new(self, t, opts, block || callable)
- job.check_frequency
+ return_job_instance ||= opts[:job]
- @jobs.push(job)
+ job_class =
+ case job_type
+ when :once
+ opts[:_t] ||= Rufus::Scheduler.parse(t, opts)
+ opts[:_t].is_a?(Numeric) ? InJob : AtJob
+ when :every
+ EveryJob
+ when :interval
+ IntervalJob
+ when :cron
+ CronJob
+ end
- return_job_instance ? job : job.job_id
- end
+ job = job_class.new(self, t, opts, block || callable)
+ job.check_frequency
+
+ @jobs.push(job)
+
+ return_job_instance ? job : job.job_id
end
+
+ def monow; self.class.monow; end
+ def ltstamp; self.class.ltstamp; end
end