lib/rufus/scheduler.rb in rufus-scheduler-3.6.0 vs lib/rufus/scheduler.rb in rufus-scheduler-3.7.0

- old
+ new

@@ -1,629 +1,722 @@ -require 'set' require 'date' if RUBY_VERSION < '1.9.0' -require 'time' require 'thread' require 'fugit' -module Rufus +module Rufus; end - class Scheduler +class Rufus::Scheduler - VERSION = '3.6.0' + VERSION = '3.7.0' - EoTime = ::EtOrbi::EoTime + EoTime = ::EtOrbi::EoTime - require 'rufus/scheduler/util' - require 'rufus/scheduler/jobs' - require 'rufus/scheduler/job_array' - require 'rufus/scheduler/locks' + require 'rufus/scheduler/util' + require 'rufus/scheduler/jobs_core' + require 'rufus/scheduler/jobs_one_time' + require 'rufus/scheduler/jobs_repeat' + require 'rufus/scheduler/job_array' + require 'rufus/scheduler/locks' - # - # A common error class for rufus-scheduler - # - class Error < StandardError; end + # + # A common error class for rufus-scheduler + # + class Error < StandardError; end - # - # This error is thrown when the :timeout attribute triggers - # - class TimeoutError < Error; end + # + # This error is thrown when the :timeout attribute triggers + # + class TimeoutError < Error; end - # - # For when the scheduler is not running - # (it got shut down or didn't start because of a lock) - # - class NotRunningError < Error; end + # + # For when the scheduler is not running + # (it got shut down or didn't start because of a lock) + # + class NotRunningError < Error; end - #MIN_WORK_THREADS = 3 - MAX_WORK_THREADS = 28 + #MIN_WORK_THREADS = 3 + MAX_WORK_THREADS = 28 - attr_accessor :frequency - attr_reader :started_at - attr_reader :thread - attr_reader :thread_key - attr_reader :mutexes + attr_accessor :frequency + attr_accessor :discard_past - #attr_accessor :min_work_threads - attr_accessor :max_work_threads + attr_reader :started_at + attr_reader :paused_at + attr_reader :thread + attr_reader :thread_key + attr_reader :mutexes - attr_accessor :stderr + #attr_accessor :min_work_threads + attr_accessor :max_work_threads - attr_reader :work_queue + attr_accessor :stderr - def initialize(opts={}) + attr_reader :work_queue - @opts = opts + def initialize(opts={}) - @started_at = nil - @paused = false + @opts = opts - @jobs = JobArray.new + @started_at = nil + @paused_at = nil - @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) - @mutexes = {} + @jobs = JobArray.new - @work_queue = Queue.new + @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) + @discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true - #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS - @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS + @mutexes = {} - @stderr = $stderr + @work_queue = Queue.new + @join_queue = Queue.new - @thread_key = "rufus_scheduler_#{self.object_id}" + #@min_work_threads = + # opts[:min_work_threads] || opts[:min_worker_threads] || + # MIN_WORK_THREADS + @max_work_threads = + opts[:max_work_threads] || opts[:max_worker_threads] || + MAX_WORK_THREADS - @scheduler_lock = - if lockfile = opts[:lockfile] - Rufus::Scheduler::FileLock.new(lockfile) - else - opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new - end + @stderr = $stderr - @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new + @thread_key = "rufus_scheduler_#{self.object_id}" - # If we can't grab the @scheduler_lock, don't run. - lock || return + @scheduler_lock = + if lockfile = opts[:lockfile] + Rufus::Scheduler::FileLock.new(lockfile) + else + opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new + end - start - end + @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new - # Returns a singleton Rufus::Scheduler instance - # - def self.singleton(opts={}) + # If we can't grab the @scheduler_lock, don't run. + lock || return - @singleton ||= Rufus::Scheduler.new(opts) - end + start + end - # Alias for Rufus::Scheduler.singleton - # - def self.s(opts={}); singleton(opts); end + # Returns a singleton Rufus::Scheduler instance + # + def self.singleton(opts={}) - # Releasing the gem would probably require redirecting .start_new to - # .new and emit a simple deprecation message. - # - # For now, let's assume the people pointing at rufus-scheduler/master - # on GitHub know what they do... - # - def self.start_new + @singleton ||= Rufus::Scheduler.new(opts) + end - fail "this is rufus-scheduler 3.x, use .new instead of .start_new" - end + # Alias for Rufus::Scheduler.singleton + # + def self.s(opts={}); singleton(opts); end - def shutdown(opt=nil) + # Releasing the gem would probably require redirecting .start_new to + # .new and emit a simple deprecation message. + # + # For now, let's assume the people pointing at rufus-scheduler/master + # on GitHub know what they do... + # + def self.start_new - @started_at = nil + fail 'this is rufus-scheduler 3.x, use .new instead of .start_new' + end - #jobs.each { |j| j.unschedule } - # - # which provokes https://github.com/jmettraux/rufus-scheduler/issues/98 - # using the following instead: - # - @jobs.array.each { |j| j.unschedule } + def uptime - @work_queue.clear + @started_at ? EoTime.now - @started_at : nil + end - if opt == :wait - join_all_work_threads - elsif opt == :kill - kill_all_work_threads - end + def around_trigger(job) - unlock - end + yield + end - alias stop shutdown + def uptime_s - def uptime + uptime ? self.class.to_duration(uptime) : '' + end - @started_at ? EoTime.now - @started_at : nil - end + def join(time_limit=nil) - def uptime_s + fail NotRunningError.new('cannot join scheduler that is not running') \ + unless @thread + fail ThreadError.new('scheduler thread cannot join itself') \ + if @thread == Thread.current - uptime ? self.class.to_duration(uptime) : '' + if time_limit + time_limit_join(time_limit) + else + no_time_limit_join end + end - def join + def down? - fail NotRunningError.new( - 'cannot join scheduler that is not running' - ) unless @thread + ! @started_at + end - @thread.join - end + def up? - def down? + !! @started_at + end - ! @started_at - end + def paused? - def up? + !! @paused_at + end - !! @started_at - end + def pause - def paused? + @paused_at = EoTime.now + end - @paused - end + def resume(opts={}) - def pause + dp = opts[:discard_past] + jobs.each { |job| job.resume_discard_past = dp } - @paused = true - end + @paused_at = nil + end - def resume + #-- + # scheduling methods + #++ - @paused = false - end + def at(time, callable=nil, opts={}, &block) - #-- - # scheduling methods - #++ + do_schedule(:once, time, callable, opts, opts[:job], block) + end - def at(time, callable=nil, opts={}, &block) + def schedule_at(time, callable=nil, opts={}, &block) - do_schedule(:once, time, callable, opts, opts[:job], block) - end + do_schedule(:once, time, callable, opts, true, block) + end - def schedule_at(time, callable=nil, opts={}, &block) + def in(duration, callable=nil, opts={}, &block) - do_schedule(:once, time, callable, opts, true, block) - end + do_schedule(:once, duration, callable, opts, opts[:job], block) + end - def in(duration, callable=nil, opts={}, &block) + def schedule_in(duration, callable=nil, opts={}, &block) - do_schedule(:once, duration, callable, opts, opts[:job], block) - end + do_schedule(:once, duration, callable, opts, true, block) + end - def schedule_in(duration, callable=nil, opts={}, &block) + def every(duration, callable=nil, opts={}, &block) - do_schedule(:once, duration, callable, opts, true, block) - end + do_schedule(:every, duration, callable, opts, opts[:job], block) + end - def every(duration, callable=nil, opts={}, &block) + def schedule_every(duration, callable=nil, opts={}, &block) - do_schedule(:every, duration, callable, opts, opts[:job], block) - end + do_schedule(:every, duration, callable, opts, true, block) + end - def schedule_every(duration, callable=nil, opts={}, &block) + def interval(duration, callable=nil, opts={}, &block) - do_schedule(:every, duration, callable, opts, true, block) - end + do_schedule(:interval, duration, callable, opts, opts[:job], block) + end - def interval(duration, callable=nil, opts={}, &block) + def schedule_interval(duration, callable=nil, opts={}, &block) - do_schedule(:interval, duration, callable, opts, opts[:job], block) - end + do_schedule(:interval, duration, callable, opts, true, block) + end - def schedule_interval(duration, callable=nil, opts={}, &block) + def cron(cronline, callable=nil, opts={}, &block) - do_schedule(:interval, duration, callable, opts, true, block) - end + do_schedule(:cron, cronline, callable, opts, opts[:job], block) + end - def cron(cronline, callable=nil, opts={}, &block) + def schedule_cron(cronline, callable=nil, opts={}, &block) - do_schedule(:cron, cronline, callable, opts, opts[:job], block) - end + do_schedule(:cron, cronline, callable, opts, true, block) + end - def schedule_cron(cronline, callable=nil, opts={}, &block) + def schedule(arg, callable=nil, opts={}, &block) - do_schedule(:cron, cronline, callable, opts, true, block) + callable, opts = nil, callable if callable.is_a?(Hash) + opts = opts.dup + + opts[:_t] = Rufus::Scheduler.parse(arg, opts) + + case opts[:_t] + when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) + when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block) + else schedule_in(arg, callable, opts, &block) end + end - def schedule(arg, callable=nil, opts={}, &block) + def repeat(arg, callable=nil, opts={}, &block) - callable, opts = nil, callable if callable.is_a?(Hash) - opts = opts.dup + callable, opts = nil, callable if callable.is_a?(Hash) + opts = opts.dup - opts[:_t] = Scheduler.parse(arg, opts) + opts[:_t] = Rufus::Scheduler.parse(arg, opts) - case opts[:_t] - when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) - when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block) - else schedule_in(arg, callable, opts, &block) - end + case opts[:_t] + when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) + else schedule_every(arg, callable, opts, &block) end + end - def repeat(arg, callable=nil, opts={}, &block) + def unschedule(job_or_job_id) - callable, opts = nil, callable if callable.is_a?(Hash) - opts = opts.dup + job, job_id = fetch(job_or_job_id) - opts[:_t] = Scheduler.parse(arg, opts) + fail ArgumentError.new("no job found with id '#{job_id}'") unless job - case opts[:_t] - when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) - else schedule_every(arg, callable, opts, &block) - end - end + job.unschedule if job + end - def unschedule(job_or_job_id) + #-- + # jobs methods + #++ - job, job_id = fetch(job_or_job_id) + # Returns all the scheduled jobs + # (even those right before re-schedule). + # + def jobs(opts={}) - fail ArgumentError.new("no job found with id '#{job_id}'") unless job + opts = { opts => true } if opts.is_a?(Symbol) - job.unschedule if job + jobs = @jobs.to_a + + if opts[:running] + jobs = jobs.select { |j| j.running? } + elsif ! opts[:all] + jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end - #-- - # jobs methods - #++ + tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s) + jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } - # Returns all the scheduled jobs - # (even those right before re-schedule). - # - def jobs(opts={}) + jobs + end - opts = { opts => true } if opts.is_a?(Symbol) + def at_jobs(opts={}) - jobs = @jobs.to_a + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } + end - if opts[:running] - jobs = jobs.select { |j| j.running? } - elsif ! opts[:all] - jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } - end + def in_jobs(opts={}) - tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s) - jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } + end - jobs - end + def every_jobs(opts={}) - def at_jobs(opts={}) + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } + end - jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } - end + def interval_jobs(opts={}) - def in_jobs(opts={}) + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } + end - jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } - end + def cron_jobs(opts={}) - def every_jobs(opts={}) + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } + end - jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } - end + def job(job_id) - def interval_jobs(opts={}) + @jobs[job_id] + end - jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } - end + # Returns true if the scheduler has acquired the [exclusive] lock and + # thus may run. + # + # Most of the time, a scheduler is run alone and this method should + # return true. It is useful in cases where among a group of applications + # only one of them should run the scheduler. For schedulers that should + # not run, the method should return false. + # + # Out of the box, rufus-scheduler proposes the + # :lockfile => 'path/to/lock/file' scheduler start option. It makes + # it easy for schedulers on the same machine to determine which should + # run (the first to write the lockfile and lock it). It uses "man 2 flock" + # so it probably won't work reliably on distributed file systems. + # + # If one needs to use a special/different locking mechanism, the scheduler + # accepts :scheduler_lock => lock_object. lock_object only needs to respond + # to #lock + # and #unlock, and both of these methods should be idempotent. + # + # Look at rufus/scheduler/locks.rb for an example. + # + def lock - def cron_jobs(opts={}) + @scheduler_lock.lock + end - jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } - end + # Sister method to #lock, is called when the scheduler shuts down. + # + def unlock - def job(job_id) + @trigger_lock.unlock + @scheduler_lock.unlock + end - @jobs[job_id] - end + # Callback called when a job is triggered. If the lock cannot be acquired, + # the job won't run (though it'll still be scheduled to run again if + # necessary). + # + def confirm_lock - # Returns true if the scheduler has acquired the [exclusive] lock and - # thus may run. - # - # Most of the time, a scheduler is run alone and this method should - # return true. It is useful in cases where among a group of applications - # only one of them should run the scheduler. For schedulers that should - # not run, the method should return false. - # - # Out of the box, rufus-scheduler proposes the - # :lockfile => 'path/to/lock/file' scheduler start option. It makes - # it easy for schedulers on the same machine to determine which should - # run (the first to write the lockfile and lock it). It uses "man 2 flock" - # so it probably won't work reliably on distributed file systems. - # - # If one needs to use a special/different locking mechanism, the scheduler - # accepts :scheduler_lock => lock_object. lock_object only needs to respond - # to #lock - # and #unlock, and both of these methods should be idempotent. - # - # Look at rufus/scheduler/locks.rb for an example. - # - def lock + @trigger_lock.lock + end - @scheduler_lock.lock - end + # Returns true if this job is currently scheduled. + # + # Takes extra care to answer true if the job is a repeat job + # currently firing. + # + def scheduled?(job_or_job_id) - # Sister method to #lock, is called when the scheduler shuts down. - # - def unlock + job, _ = fetch(job_or_job_id) - @trigger_lock.unlock - @scheduler_lock.unlock - end + !! (job && job.unscheduled_at.nil? && job.next_time != nil) + end - # Callback called when a job is triggered. If the lock cannot be acquired, - # the job won't run (though it'll still be scheduled to run again if - # necessary). - # - def confirm_lock + # Lists all the threads associated with this scheduler. + # + def threads - @trigger_lock.lock - end + Thread.list.select { |t| t[thread_key] } + end - # Returns true if this job is currently scheduled. - # - # Takes extra care to answer true if the job is a repeat job - # currently firing. - # - def scheduled?(job_or_job_id) + # Lists all the work threads (the ones actually running the scheduled + # block code) + # + # Accepts a query option, which can be set to: + # * :all (default), returns all the threads that are work threads + # or are currently running a job + # * :active, returns all threads that are currently running a job + # * :vacant, returns the threads that are not running a job + # + # If, thanks to :blocking => true, a job is scheduled to monopolize the + # main scheduler thread, that thread will get returned when :active or + # :all. + # + def work_threads(query=:all) - job, _ = fetch(job_or_job_id) + ts = threads.select { |t| t[:rufus_scheduler_work_thread] } - !! (job && job.unscheduled_at.nil? && job.next_time != nil) + case query + when :active then ts.select { |t| t[:rufus_scheduler_job] } + when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } + else ts end + end - # Lists all the threads associated with this scheduler. - # - def threads + def running_jobs(opts={}) - Thread.list.select { |t| t[thread_key] } - end + jobs(opts.merge(:running => true)) + end - # Lists all the work threads (the ones actually running the scheduled - # block code) - # - # Accepts a query option, which can be set to: - # * :all (default), returns all the threads that are work threads - # or are currently running a job - # * :active, returns all threads that are currently running a job - # * :vacant, returns the threads that are not running a job - # - # If, thanks to :blocking => true, a job is scheduled to monopolize the - # main scheduler thread, that thread will get returned when :active or - # :all. - # - def work_threads(query=:all) + def occurrences(time0, time1, format=:per_job) - ts = threads.select { |t| t[:rufus_scheduler_work_thread] } + h = {} - case query - when :active then ts.select { |t| t[:rufus_scheduler_job] } - when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } - else ts - end + jobs.each do |j| + os = j.occurrences(time0, time1) + h[j] = os if os.any? end - def running_jobs(opts={}) + if format == :timeline + a = [] + h.each { |j, ts| ts.each { |t| a << [ t, j ] } } + a.sort_by { |(t, _)| t } + else + h + end + end - jobs(opts.merge(:running => true)) + def timeline(time0, time1) + + occurrences(time0, time1, :timeline) + end + + def on_error(job, err) + + pre = err.object_id.to_s + + ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } + + stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") + stderr.puts(" #{pre} job:") + stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") + # TODO: eventually use a Job#detail or something like that + stderr.puts(" #{pre} error:") + stderr.puts(" #{pre} #{err.object_id}") + stderr.puts(" #{pre} #{err.class}") + stderr.puts(" #{pre} #{err}") + err.backtrace.each do |l| + stderr.puts(" #{pre} #{l}") end + stderr.puts(" #{pre} tz:") + stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") + stderr.puts(" #{pre} Time.now: #{Time.now}") + stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") + stderr.puts(" #{pre} et-orbi:") + stderr.puts(" #{pre} #{EoTime.platform_info}") + stderr.puts(" #{pre} scheduler:") + stderr.puts(" #{pre} object_id: #{object_id}") + stderr.puts(" #{pre} opts:") + stderr.puts(" #{pre} #{@opts.inspect}") + stderr.puts(" #{pre} frequency: #{self.frequency}") + stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") + stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") + stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") + stderr.puts(" #{pre} down?: #{down?}") + stderr.puts(" #{pre} frequency: #{frequency.inspect}") + stderr.puts(" #{pre} discard_past: #{discard_past.inspect}") + stderr.puts(" #{pre} started_at: #{started_at.inspect}") + stderr.puts(" #{pre} paused_at: #{paused_at.inspect}") + stderr.puts(" #{pre} threads: #{self.threads.size}") + stderr.puts(" #{pre} thread: #{self.thread}") + stderr.puts(" #{pre} thread_key: #{self.thread_key}") + stderr.puts(" #{pre} work_threads: #{work_threads.size}") + stderr.puts(" #{pre} active: #{work_threads(:active).size}") + stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") + stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") + stderr.puts(" #{pre} mutexes: #{ms.inspect}") + stderr.puts(" #{pre} jobs: #{jobs.size}") + stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") + stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") + stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") + stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") + stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") + stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") + stderr.puts(" #{pre} work_queue:") + stderr.puts(" #{pre} size: #{@work_queue.size}") + stderr.puts(" #{pre} num_waiting: #{@work_queue.num_waiting}") + stderr.puts(" #{pre} join_queue:") + stderr.puts(" #{pre} size: #{@join_queue.size}") + stderr.puts(" #{pre} num_waiting: #{@join_queue.num_waiting}") + stderr.puts("} #{pre} .") - def occurrences(time0, time1, format=:per_job) + rescue => e - h = {} + stderr.puts("failure in #on_error itself:") + stderr.puts(e.inspect) + stderr.puts(e.backtrace) - jobs.each do |j| - os = j.occurrences(time0, time1) - h[j] = os if os.any? - end + ensure - if format == :timeline - a = [] - h.each { |j, ts| ts.each { |t| a << [ t, j ] } } - a.sort_by { |(t, _)| t } - else - h + stderr.flush + end + + def shutdown(opt=nil) + + opts = + case opt + when Symbol then { opt => true } + when Hash then opt + else {} end - end - def timeline(time0, time1) + @jobs.unschedule_all - occurrences(time0, time1, :timeline) + if opts[:wait] || opts[:join] + join_shutdown(opts) + elsif opts[:kill] + kill_shutdown(opts) + else + regular_shutdown(opts) end - def on_error(job, err) + @work_queue.clear - pre = err.object_id.to_s + unlock - ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } + @thread.join + end + alias stop shutdown - stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") - stderr.puts(" #{pre} job:") - stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") - # TODO: eventually use a Job#detail or something like that - stderr.puts(" #{pre} error:") - stderr.puts(" #{pre} #{err.object_id}") - stderr.puts(" #{pre} #{err.class}") - stderr.puts(" #{pre} #{err}") - err.backtrace.each do |l| - stderr.puts(" #{pre} #{l}") - end - stderr.puts(" #{pre} tz:") - stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") - stderr.puts(" #{pre} Time.now: #{Time.now}") - stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") - stderr.puts(" #{pre} et-orbi:") - stderr.puts(" #{pre} #{EoTime.platform_info}") - stderr.puts(" #{pre} scheduler:") - stderr.puts(" #{pre} object_id: #{object_id}") - stderr.puts(" #{pre} opts:") - stderr.puts(" #{pre} #{@opts.inspect}") - stderr.puts(" #{pre} frequency: #{self.frequency}") - stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") - stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") - stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") - stderr.puts(" #{pre} down?: #{down?}") - stderr.puts(" #{pre} threads: #{self.threads.size}") - stderr.puts(" #{pre} thread: #{self.thread}") - stderr.puts(" #{pre} thread_key: #{self.thread_key}") - stderr.puts(" #{pre} work_threads: #{work_threads.size}") - stderr.puts(" #{pre} active: #{work_threads(:active).size}") - stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") - stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") - stderr.puts(" #{pre} mutexes: #{ms.inspect}") - stderr.puts(" #{pre} jobs: #{jobs.size}") - stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") - stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") - stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") - stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") - stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") - stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") - stderr.puts(" #{pre} work_queue: #{work_queue.size}") - stderr.puts("} #{pre} .") + protected - rescue => e + def join_shutdown(opts) - stderr.puts("failure in #on_error itself:") - stderr.puts(e.inspect) - stderr.puts(e.backtrace) + limit = opts[:wait] || opts[:join] + limit = limit.is_a?(Numeric) ? limit : nil - ensure + #@started_at = nil + # + # when @started_at is nil, the scheduler thread exits, here + # we want it to exit when all the work threads have been joined + # hence it's set to nil later on + # + @paused_at = EoTime.now - stderr.flush - end + (work_threads.size * 2 + 1).times { @work_queue << :shutdown } - protected + work_threads + .collect { |wt| + wt == Thread.current ? nil : Thread.new { wt.join(limit); wt.kill } } + .each { |st| + st.join if st } - # Returns [ job, job_id ] - # - def fetch(job_or_job_id) + @started_at = nil + end - if job_or_job_id.respond_to?(:job_id) - [ job_or_job_id, job_or_job_id.job_id ] - else - [ job(job_or_job_id), job_or_job_id ] - end - end + def kill_shutdown(opts) - def terminate_all_jobs + @started_at = nil + work_threads.each(&:kill) + end - jobs.each { |j| j.unschedule } + def regular_shutdown(opts) - sleep 0.01 while running_jobs.size > 0 - end + @started_at = nil + end - def join_all_work_threads + def time_limit_join(limit) - work_threads.size.times { @work_queue << :sayonara } + fail ArgumentError.new("limit #{limit.inspect} should be > 0") \ + unless limit.is_a?(Numeric) && limit > 0 - work_threads.each { |t| t.join } + t0 = monow + f = [ limit.to_f / 20, 0.100 ].min - @work_queue.clear + while monow - t0 < limit + r = + begin + @join_queue.pop(true) + rescue ThreadError => e + # #<ThreadError: queue empty> + false + end + return r if r + sleep(f) end - def kill_all_work_threads + nil + end - work_threads.each { |t| t.kill } + def no_time_limit_join + + @join_queue.pop + end + + # Returns [ job, job_id ] + # + def fetch(job_or_job_id) + + if job_or_job_id.respond_to?(:job_id) + [ job_or_job_id, job_or_job_id.job_id ] + else + [ job(job_or_job_id), job_or_job_id ] end + end - #def free_all_work_threads - # - # work_threads.each { |t| t.raise(KillSignal) } - #end + def terminate_all_jobs - def start + jobs.each { |j| j.unschedule } - @started_at = EoTime.now + sleep 0.01 while running_jobs.size > 0 + end - @thread = - Thread.new do + #def free_all_work_threads + # + # work_threads.each { |t| t.raise(KillSignal) } + #end - while @started_at do + def start - unschedule_jobs - trigger_jobs unless @paused - timeout_jobs + @started_at = EoTime.now - sleep(@frequency) - end + @thread = + Thread.new do + + while @started_at do + + unschedule_jobs + trigger_jobs unless @paused_at + timeout_jobs + + sleep(@frequency) end - @thread[@thread_key] = true - @thread[:rufus_scheduler] = self - @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" - end + rejoin + end - def unschedule_jobs + @thread[@thread_key] = true + @thread[:rufus_scheduler] = self + @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" + end - @jobs.delete_unscheduled - end + def unschedule_jobs - def trigger_jobs + @jobs.delete_unscheduled + end - now = EoTime.now + def trigger_jobs - @jobs.each(now) do |job| + now = EoTime.now - job.trigger(now) - end + @jobs.each(now) do |job| + + job.trigger(now) end + end - def timeout_jobs + def timeout_jobs - work_threads(:active).each do |t| + work_threads(:active).each do |t| - job = t[:rufus_scheduler_job] - to = t[:rufus_scheduler_timeout] - ts = t[:rufus_scheduler_time] + job = t[:rufus_scheduler_job] + to = t[:rufus_scheduler_timeout] + ts = t[:rufus_scheduler_time] - next unless job && to && ts - # thread might just have become inactive (job -> nil) + next unless job && to && ts + # thread might just have become inactive (job -> nil) - to = ts + to unless to.is_a?(EoTime) + to = ts + to unless to.is_a?(EoTime) - next if to > EoTime.now + next if to > EoTime.now - t.raise(Rufus::Scheduler::TimeoutError) - end + t.raise(Rufus::Scheduler::TimeoutError) end + end - def do_schedule(job_type, t, callable, opts, return_job_instance, block) + def rejoin - fail NotRunningError.new( - 'cannot schedule, scheduler is down or shutting down' - ) if @started_at.nil? + (@join_queue.num_waiting * 2 + 1).times { @join_queue << @thread } + end - callable, opts = nil, callable if callable.is_a?(Hash) - opts = opts.dup unless opts.has_key?(:_t) + def do_schedule(job_type, t, callable, opts, return_job_instance, block) - return_job_instance ||= opts[:job] + fail NotRunningError.new( + 'cannot schedule, scheduler is down or shutting down' + ) if @started_at.nil? - job_class = - case job_type - when :once - opts[:_t] ||= Rufus::Scheduler.parse(t, opts) - opts[:_t].is_a?(Numeric) ? InJob : AtJob - when :every - EveryJob - when :interval - IntervalJob - when :cron - CronJob - end + callable, opts = nil, callable if callable.is_a?(Hash) + opts = opts.dup unless opts.has_key?(:_t) - job = job_class.new(self, t, opts, block || callable) - job.check_frequency + return_job_instance ||= opts[:job] - @jobs.push(job) + job_class = + case job_type + when :once + opts[:_t] ||= Rufus::Scheduler.parse(t, opts) + opts[:_t].is_a?(Numeric) ? InJob : AtJob + when :every + EveryJob + when :interval + IntervalJob + when :cron + CronJob + end - return_job_instance ? job : job.job_id - end + job = job_class.new(self, t, opts, block || callable) + job.check_frequency + + @jobs.push(job) + + return_job_instance ? job : job.job_id end + + def monow; self.class.monow; end + def ltstamp; self.class.ltstamp; end end