lib/rufus/scheduler.rb in rufus-scheduler-2.0.24 vs lib/rufus/scheduler.rb in rufus-scheduler-3.0.0

- old
+ new

@@ -20,43 +20,519 @@ # THE SOFTWARE. # # Made in Japan. #++ +require 'date' if RUBY_VERSION < '1.9.0' +require 'time' +require 'thread' +require 'tzinfo' +require 'fileutils' -require 'rufus/sc/scheduler' +module Rufus -module Rufus::Scheduler + class Scheduler - # Starts and return a new instance of a PlainScheduler. - # - def self.new(opts={}) + require 'rufus/scheduler/util' + require 'rufus/scheduler/jobs' + require 'rufus/scheduler/cronline' + require 'rufus/scheduler/job_array' - PlainScheduler.start_new(opts) - end + VERSION = '3.0.0' - # A quick way to get a scheduler up an running - # - # require 'rubygems' - # s = Rufus::Scheduler.start_new - # - # If EventMachine is present and running will create an EmScheduler, else - # it will create a PlainScheduler instance. - # - def self.start_new(opts={}) + # + # This error is thrown when the :timeout attribute triggers + # + class TimeoutError < StandardError; end - if defined?(EM) and EM.reactor_running? - EmScheduler.start_new(opts) - else - PlainScheduler.start_new(opts) + #MIN_WORK_THREADS = 7 + MAX_WORK_THREADS = 35 + + attr_accessor :frequency + attr_reader :started_at + attr_reader :thread + attr_reader :thread_key + attr_reader :mutexes + + #attr_accessor :min_work_threads + attr_accessor :max_work_threads + + attr_accessor :stderr + + attr_reader :work_queue + + def initialize(opts={}) + + @opts = opts + + @started_at = nil + @paused = false + + @jobs = JobArray.new + + @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) + @mutexes = {} + + @work_queue = Queue.new + + #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS + @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS + + @stderr = $stderr + + @thread_key = "rufus_scheduler_#{self.object_id}" + + consider_lockfile || return + + start end - end - # Returns true if the given string seems to be a cron string. - # - def self.is_cron_string(s) + # Returns a singleton Rufus::Scheduler instance + # + def self.singleton(opts={}) - s.match(/.+ .+ .+ .+ .+/) # well... + @singleton ||= Rufus::Scheduler.new(opts) + end + + # Alias for Rufus::Scheduler.singleton + # + def self.s(opts={}); singleton(opts); end + + # Releasing the gem would probably require redirecting .start_new to + # .new and emit a simple deprecation message. + # + # For now, let's assume the people pointing at rufus-scheduler/master + # on GitHub know what they do... + # + def self.start_new + + fail "this is rufus-scheduler 3.0, use .new instead of .start_new" + end + + def shutdown(opt=nil) + + @started_at = nil + + jobs.each { |j| j.unschedule } + + @work_queue.clear + + if opt == :wait + join_all_work_threads + elsif opt == :kill + kill_all_work_threads + end + + @lockfile.flock(File::LOCK_UN) if @lockfile + end + + alias stop shutdown + + def uptime + + @started_at ? Time.now - @started_at : nil + end + + def uptime_s + + self.class.to_duration(uptime) + end + + def join + + @thread.join + end + + def paused? + + @paused + end + + def pause + + @paused = true + end + + def resume + + @paused = false + end + + #-- + # scheduling methods + #++ + + def at(time, callable=nil, opts={}, &block) + + do_schedule(:once, time, callable, opts, opts[:job], block) + end + + def schedule_at(time, callable=nil, opts={}, &block) + + do_schedule(:once, time, callable, opts, true, block) + end + + def in(duration, callable=nil, opts={}, &block) + + do_schedule(:once, duration, callable, opts, opts[:job], block) + end + + def schedule_in(duration, callable=nil, opts={}, &block) + + do_schedule(:once, duration, callable, opts, true, block) + end + + def every(duration, callable=nil, opts={}, &block) + + do_schedule(:every, duration, callable, opts, opts[:job], block) + end + + def schedule_every(duration, callable=nil, opts={}, &block) + + do_schedule(:every, duration, callable, opts, true, block) + end + + def interval(duration, callable=nil, opts={}, &block) + + do_schedule(:interval, duration, callable, opts, opts[:job], block) + end + + def schedule_interval(duration, callable=nil, opts={}, &block) + + do_schedule(:interval, duration, callable, opts, true, block) + end + + def cron(cronline, callable=nil, opts={}, &block) + + do_schedule(:cron, cronline, callable, opts, opts[:job], block) + end + + def schedule_cron(cronline, callable=nil, opts={}, &block) + + do_schedule(:cron, cronline, callable, opts, true, block) + end + + def schedule(arg, callable=nil, opts={}, &block) + + # TODO: eventually, spare one parse call + + case Scheduler.parse(arg) + when CronLine then schedule_cron(arg, callable, opts, &block) + when Time then schedule_at(arg, callable, opts, &block) + else schedule_in(arg, callable, opts, &block) + end + end + + def repeat(arg, callable=nil, opts={}, &block) + + # TODO: eventually, spare one parse call + + case Scheduler.parse(arg) + when CronLine then schedule_cron(arg, callable, opts, &block) + else schedule_every(arg, callable, opts, &block) + end + end + + def unschedule(job_or_job_id) + + job, job_id = fetch(job_or_job_id) + + fail ArgumentError.new("no job found with id '#{job_id}'") unless job + + job.unschedule if job + end + + #-- + # jobs methods + #++ + + # Returns all the scheduled jobs + # (even those right before re-schedule). + # + def jobs(opts={}) + + opts = { opts => true } if opts.is_a?(Symbol) + + jobs = @jobs.to_a + + if opts[:running] + jobs = jobs.select { |j| j.running? } + elsif ! opts[:all] + jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } + end + + tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } + jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } + + jobs + end + + def at_jobs(opts={}) + + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } + end + + def in_jobs(opts={}) + + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } + end + + def every_jobs(opts={}) + + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } + end + + def interval_jobs(opts={}) + + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } + end + + def cron_jobs(opts={}) + + jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } + end + + def job(job_id) + + @jobs[job_id] + end + + # Returns true if this job is currently scheduled. + # + # Takes extra care to answer true if the job is a repeat job + # currently firing. + # + def scheduled?(job_or_job_id) + + job, job_id = fetch(job_or_job_id) + + !! (job && job.next_time != nil) + end + + # Lists all the threads associated with this scheduler. + # + def threads + + Thread.list.select { |t| t[thread_key] } + end + + # Lists all the work threads (the ones actually running the scheduled + # block code) + # + # Accepts a query option, which can be set to: + # * :all (default), returns all the threads that are work threads + # or are currently running a job + # * :active, returns all threads that are currenly running a job + # * :vacant, returns the threads that are not running a job + # + # If, thanks to :blocking => true, a job is scheduled to monopolize the + # main scheduler thread, that thread will get returned when :active or + # :all. + # + def work_threads(query=:all) + + ts = + threads.select { |t| + t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] + } + + case query + when :active then ts.select { |t| t[:rufus_scheduler_job] } + when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } + else ts + end + end + + def running_jobs(opts={}) + + jobs(opts.merge(:running => true)) + end + + def on_error(job, err) + + pre = err.object_id.to_s + + stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") + stderr.puts(" #{pre} job:") + stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") + stderr.puts(" #{pre} error:") + stderr.puts(" #{pre} #{err.object_id}") + stderr.puts(" #{pre} #{err.class}") + stderr.puts(" #{pre} #{err}") + err.backtrace.each do |l| + stderr.puts(" #{pre} #{l}") + end + stderr.puts("} #{pre} .") + + rescue => e + + stderr.puts("failure in #on_error itself:") + stderr.puts(e.inspect) + stderr.puts(e.backtrace) + + ensure + + stderr.flush + end + + protected + + # Returns [ job, job_id ] + # + def fetch(job_or_job_id) + + if job_or_job_id.respond_to?(:job_id) + [ job_or_job_id, job_or_job_id.job_id ] + else + [ job(job_or_job_id), job_or_job_id ] + end + end + + def consider_lockfile + + @lockfile = nil + + return true unless f = @opts[:lockfile] + + raise ArgumentError.new( + ":lockfile argument must be a string, not a #{f.class}" + ) unless f.is_a?(String) + + FileUtils.mkdir_p(File.dirname(f)) + + f = File.new(f, File::RDWR | File::CREAT) + locked = f.flock(File::LOCK_NB | File::LOCK_EX) + + return false unless locked + + now = Time.now + + f.print("pid: #{$$}, ") + f.print("scheduler.object_id: #{self.object_id}, ") + f.print("time: #{now}, ") + f.print("timestamp: #{now.to_f}") + f.flush + + @lockfile = f + + true + end + + def terminate_all_jobs + + jobs.each { |j| j.unschedule } + + sleep 0.01 while running_jobs.size > 0 + end + + def join_all_work_threads + + work_threads.size.times { @work_queue << :sayonara } + + work_threads.each { |t| t.join } + + @work_queue.clear + end + + def kill_all_work_threads + + work_threads.each { |t| t.kill } + end + + #def free_all_work_threads + # + # work_threads.each { |t| t.raise(KillSignal) } + #end + + def start + + @started_at = Time.now + + @thread = + Thread.new do + + while @started_at do + + unschedule_jobs + trigger_jobs unless @paused + timeout_jobs + + sleep(@frequency) + end + end + + @thread[@thread_key] = true + @thread[:rufus_scheduler] = self + @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" + end + + def unschedule_jobs + + @jobs.delete_unscheduled + end + + def trigger_jobs + + now = Time.now + + @jobs.each(now) do |job| + + job.trigger(now) + end + end + + def timeout_jobs + + work_threads(:active).each do |t| + + job = t[:rufus_scheduler_job] + to = t[:rufus_scheduler_timeout] + + next unless job && to + # thread might just have become inactive (job -> nil) + + ts = t[:rufus_scheduler_time] + to = to.is_a?(Time) ? to : ts + to + + next if to > Time.now + + t.raise(Rufus::Scheduler::TimeoutError) + end + end + + def do_schedule(job_type, t, callable, opts, return_job_instance, block) + + raise RuntimeError.new( + 'cannot schedule, scheduler is down or shutting down' + ) if @started_at == nil + + callable, opts = nil, callable if callable.is_a?(Hash) + return_job_instance ||= opts[:job] + + job_class = + case job_type + when :once + tt = Rufus::Scheduler.parse(t) + tt.is_a?(Time) ? AtJob : InJob + when :every + EveryJob + when :interval + IntervalJob + when :cron + CronJob + end + + job = job_class.new(self, t, opts, block || callable) + + raise ArgumentError.new( + "job frequency (#{job.frequency}) is higher than " + + "scheduler frequency (#{@frequency})" + ) if job.respond_to?(:frequency) && job.frequency < @frequency + + @jobs.push(job) + + return_job_instance ? job : job.job_id + end end end