lib/rufus/scheduler.rb in rufus-scheduler-1.0.12 vs lib/rufus/scheduler.rb in rufus-scheduler-1.0.13
- old
+ new
@@ -1,1377 +1,3 @@
-#
-#--
-# Copyright (c) 2006-2008, John Mettraux, jmettraux@gmail.com
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#++
-#
-#
-# "made in Japan"
-#
-# John Mettraux at openwfe.org
-#
-
-require 'thread'
-require 'rufus/otime'
-require 'rufus/cronline'
-
-module Rufus
-
- #
- # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
- # 'at' jobs to execute once at a given point in time. 'cron' jobs
- # execute a specified intervals.
- # The two main methods are thus schedule_at() and schedule().
- #
- # schedule_at() and schedule() await either a Schedulable instance and
- # params (usually an array or nil), either a block, which is more in the
- # Ruby way.
- #
- # == The gem "rufus-scheduler"
- #
- # This scheduler was previously known as the "openwferu-scheduler" gem.
- #
- # To ensure that code tapping the previous gem still runs fine with
- # "rufus-scheduler", this new gem has 'pointers' for the old class
- # names.
- #
- # require 'rubygems'
- # require 'openwfe/util/scheduler'
- # s = OpenWFE::Scheduler.new
- #
- # will still run OK with "rufus-scheduler".
- #
- # == Examples
- #
- # require 'rubygems'
- # require 'rufus/scheduler'
- #
- # scheduler = Rufus::Scheduler.start_new
- #
- # scheduler.schedule_in("3d") do
- # regenerate_monthly_report()
- # end
- # #
- # # will call the regenerate_monthly_report method
- # # in 3 days from now
- #
- # scheduler.schedule "0 22 * * 1-5" do
- # log.info "activating security system..."
- # activate_security_system()
- # end
- #
- # job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
- # init_self_destruction_sequence()
- # end
- #
- # scheduler.join # join the scheduler (prevents exiting)
- #
- #
- # an example that uses a Schedulable class :
- #
- # class Regenerator < Schedulable
- # def trigger (frequency)
- # self.send(frequency)
- # end
- # def monthly
- # # ...
- # end
- # def yearly
- # # ...
- # end
- # end
- #
- # regenerator = Regenerator.new
- #
- # scheduler.schedule_in("4d", regenerator)
- # #
- # # will regenerate the report in four days
- #
- # scheduler.schedule_in(
- # "5d",
- # { :schedulable => regenerator, :scope => :month })
- # #
- # # will regenerate the monthly report in 5 days
- #
- # There is also schedule_every() :
- #
- # scheduler.schedule_every("1h20m") do
- # regenerate_latest_report()
- # end
- #
- # (note : a schedule every isn't triggered immediately, thus this example
- # will first trigger 1 hour and 20 minutes after being scheduled)
- #
- # The scheduler has a "exit_when_no_more_jobs" attribute. When set to
- # 'true', the scheduler will exit as soon as there are no more jobs to
- # run.
- # Use with care though, if you create a scheduler, set this attribute
- # to true and start the scheduler, the scheduler will immediately exit.
- # This attribute is best used indirectly : the method
- # join_until_no_more_jobs() wraps it.
- #
- # The :scheduler_precision can be set when instantiating the scheduler.
- #
- # scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
- # scheduler.start
- # #
- # # instatiates a scheduler that checks its jobs twice per second
- # # (the default is 4 times per second (0.250))
- #
- # Note that rufus-scheduler places a constraint on the values for the
- # precision : 0.0 < p <= 1.0
- # Thus
- #
- # scheduler.precision = 4.0
- #
- # or
- #
- # scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
- #
- # will raise an exception.
- #
- #
- # == Tags
- #
- # Tags can be attached to jobs scheduled :
- #
- # scheduler.schedule_in "2h", :tags => "backup" do
- # init_backup_sequence()
- # end
- #
- # scheduler.schedule "0 24 * * *", :tags => "new_day" do
- # do_this_or_that()
- # end
- #
- # jobs = find_jobs 'backup'
- # jobs.each { |job| job.unschedule }
- #
- # Multiple tags may be attached to a single job :
- #
- # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
- # init_backup_sequence()
- # end
- #
- # The vanilla case for tags assume they are String instances, but nothing
- # prevents you from using anything else. The scheduler has no persistence
- # by itself, so no serialization issue.
- #
- #
- # == Cron up to the second
- #
- # A cron schedule can be set at the second level :
- #
- # scheduler.schedule "7 * * * * *" do
- # puts "it's now the seventh second of the minute"
- # end
- #
- # The rufus scheduler recognizes an optional first column for second
- # scheduling. This column can, like for the other columns, specify a
- # value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
- #
- #
- # == information passed to schedule blocks
- #
- # When calling schedule_every(), schedule_in() or schedule_at(), the block
- # expects zero or 3 parameters like in
- #
- # scheduler.schedule_every("1h20m") do |job_id, at, params|
- # puts "my job_id is #{job_id}"
- # end
- #
- # For schedule(), zero or two parameters can get passed
- #
- # scheduler.schedule "7 * * * * *" do |job_id, cron_line, params|
- # puts "my job_id is #{job_id}"
- # end
- #
- # In both cases, params corresponds to the params passed to the schedule
- # method (:tags, :first_at, :first_in, :dont_reschedule, ...)
- #
- #
- # == Exceptions
- #
- # The rufus scheduler will output a stacktrace to the STDOUT in
- # case of exception. There are two ways to change that behaviour.
- #
- # # 1 - providing a lwarn method to the scheduler instance :
- #
- # class << scheduler
- # def lwarn (&block)
- # puts "oops, something wrong happened : "
- # puts block.call
- # end
- # end
- #
- # # or
- #
- # def scheduler.lwarn (&block)
- # puts "oops, something wrong happened : "
- # puts block.call
- # end
- #
- # # 2 - overriding the [protected] method log_exception(e) :
- #
- # class << scheduler
- # def log_exception (e)
- # puts "something wrong happened : "+e.to_s
- # end
- # end
- #
- # # or
- #
- # def scheduler.log_exception (e)
- # puts "something wrong happened : "+e.to_s
- # end
- #
- # == 'Every jobs' and rescheduling
- #
- # Every jobs can reschedule/unschedule themselves. A reschedule example :
- #
- # schedule.schedule_every "5h" do |job_id, at, params|
- #
- # mails = $inbox.fetch_mails
- # mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
- #
- # params[:every] = if mails.size > 100
- # "1h" # lots of spam, check every hour
- # else
- # "5h" # normal schedule, every 5 hours
- # end
- # end
- #
- # Unschedule example :
- #
- # schedule.schedule_every "10s" do |job_id, at, params|
- # #
- # # polls every 10 seconds until a mail arrives
- #
- # $mail = $inbox.fetch_last_mail
- #
- # params[:dont_reschedule] = true if $mail
- # end
- #
- # == 'Every jobs', :first_at and :first_in
- #
- # Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
- # optional parameters, :first_at and :first_in
- #
- # scheduler.schedule_every "2d", :first_in => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- # scheduler.schedule_every "2d", :first_at => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- # == job.next_time()
- #
- # Jobs, be they at, every or cron have a next_time() method, which tells
- # when the job will be fired next time (for at and in jobs, this is also the
- # last time).
- #
- # For cron jobs, the current implementation is quite brutal. It takes three
- # seconds on my 2006 macbook to reach a cron schedule 1 year away.
- #
- # When is the next friday 13th ?
- #
- # require 'rubygems'
- # require 'rufus/scheduler'
- #
- # puts Rufus::CronLine.new("* * 13 * fri").next_time
- #
- #
- # == :thread_name option
- #
- # You can specify the name of the scheduler's thread. Should make
- # it easier in some debugging situations.
- #
- # scheduler.new :thread_name => "the crazy scheduler"
- #
- #
- # == job.trigger_thread
- #
- # Since rufus-scheduler 1.0.8, you can have access to the thread of
- # a job currently being triggered.
- #
- # job = scheduler.get_job(job_id)
- # thread = job.trigger_thread
- #
- # This new method will return nil if the job is not currently being
- # triggered. Not that in case of an every or cron job, this method
- # will return the thread of the last triggered instance, thus, in case
- # of overlapping executions, you only get the most recent thread.
- #
- #
- # == specifying a :timeout for a job
- #
- # rufus-scheduler 1.0.12 introduces a :timeout parameter for jobs.
- #
- # scheduler.every "3h", :timeout => '2h30m' do
- # do_that_long_job()
- # end
- #
- # after 2 hours and half, the 'long job' will get interrupted by a
- # Rufus::TimeOutError (so that you know what to catch).
- #
- # :timeout is applicable to all types of jobs : at, in, every, cron. It
- # accepts a String value following the "Mdhms" scheme the rufus-scheduler
- # uses.
- #
- class Scheduler
-
- VERSION = '1.0.12'
-
- #
- # By default, the precision is 0.250, with means the scheduler
- # will check for jobs to execute 4 times per second.
- #
- attr_reader :precision
-
- #
- # Setting the precision ( 0.0 < p <= 1.0 )
- #
- def precision= (f)
-
- raise 'precision must be 0.0 < p <= 1.0' \
- if f <= 0.0 or f > 1.0
-
- @precision = f
- end
-
- #--
- # Set by default at 0.00045, it's meant to minimize drift
- #
- #attr_accessor :correction
- #++
-
- #
- # As its name implies.
- #
- attr_accessor :stopped
-
-
- def initialize (params={})
-
- super()
-
- @pending_jobs = []
- @cron_jobs = {}
- @non_cron_jobs = {}
-
- @schedule_queue = Queue.new
- @unschedule_queue = Queue.new
- #
- # sync between the step() method and the [un]schedule
- # methods is done via these queues, no more mutex
-
- @scheduler_thread = nil
-
- @precision = 0.250
- # every 250ms, the scheduler wakes up (default value)
- begin
- self.precision = Float(params[:scheduler_precision])
- rescue Exception => e
- # let precision at its default value
- end
-
- @thread_name = params[:thread_name] || "rufus scheduler"
-
- #@correction = 0.00045
-
- @exit_when_no_more_jobs = false
- #@dont_reschedule_every = false
-
- @last_cron_second = -1
-
- @stopped = true
- end
-
- #
- # Starts this scheduler (or restart it if it was previously stopped)
- #
- def start
-
- @stopped = false
-
- @scheduler_thread = Thread.new do
-
- Thread.current[:name] = @thread_name
-
- if defined?(JRUBY_VERSION)
- require 'java'
- java.lang.Thread.current_thread.name = @thread_name
- end
-
- loop do
-
- break if @stopped
-
- t0 = Time.now.to_f
-
- step
-
- d = Time.now.to_f - t0 # + @correction
-
- next if d > @precision
-
- sleep(@precision - d)
- end
- end
- end
-
- #
- # Instantiates a new Rufus::Scheduler instance, starts it and returns it
- #
- def self.start_new (params = {})
-
- s = self.new(params)
- s.start
- s
- end
-
- #
- # The scheduler is stoppable via sstop()
- #
- def stop
-
- @stopped = true
- end
-
- # (for backward compatibility)
- #
- alias :sstart :start
-
- # (for backward compatibility)
- #
- alias :sstop :stop
-
- #
- # Joins on the scheduler thread
- #
- def join
-
- @scheduler_thread.join
- end
-
- #
- # Like join() but takes care of setting the 'exit_when_no_more_jobs'
- # attribute of this scheduler to true before joining.
- # Thus the scheduler will exit (and the join terminates) as soon as
- # there aren't no more 'at' (or 'every') jobs in the scheduler.
- #
- # Currently used only in unit tests.
- #
- def join_until_no_more_jobs
-
- @exit_when_no_more_jobs = true
- join
- end
-
- #
- # Ensures that a duration is a expressed as a Float instance.
- #
- # duration_to_f("10s")
- #
- # will yield 10.0
- #
- def duration_to_f (s)
-
- Rufus.duration_to_f(s)
- end
-
- #--
- #
- # The scheduling methods
- #
- #++
-
- #
- # Schedules a job by specifying at which time it should trigger.
- # Returns the a job_id that can be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- # If the job is specified in the past, it will be triggered immediately
- # but not scheduled.
- # To avoid the triggering, the parameter :discard_past may be set to
- # true :
- #
- # jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
- # puts "you'll never read this message"
- # end
- #
- # And 'jobid' will hold a nil (not scheduled).
- #
- #
- def schedule_at (at, params={}, &block)
-
- do_schedule_at(
- at,
- prepare_params(params),
- &block)
- end
-
- #
- # a shortcut for schedule_at
- #
- alias :at :schedule_at
-
-
- #
- # Schedules a job by stating in how much time it should trigger.
- # Returns the a job_id that can be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- def schedule_in (duration, params={}, &block)
-
- do_schedule_at(
- Time.new.to_f + Rufus.duration_to_f(duration),
- prepare_params(params),
- &block)
- end
-
- #
- # a shortcut for schedule_in
- #
- alias :in :schedule_in
-
- #
- # Schedules a job in a loop. After an execution, it will not execute
- # before the time specified in 'freq'.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- # In case of exception in the job, it will be rescheduled. If you don't
- # want the job to be rescheduled, set the parameter :try_again to false.
- #
- # scheduler.schedule_every "500", :try_again => false do
- # do_some_prone_to_error_stuff()
- # # won't get rescheduled in case of exception
- # end
- #
- # Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
- # accepted.
- #
- # scheduler.schedule_every "2d", :first_in => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- # (without setting a :first_in (or :first_at), our example schedule would
- # have had been triggered after two days).
- #
- def schedule_every (freq, params={}, &block)
-
- params = prepare_params(params)
- params[:every] = freq
-
- first_at = params[:first_at]
- first_in = params[:first_in]
-
- #params[:delayed] = true if first_at or first_in
-
- first_at = if first_at
- at_to_f(first_at)
- elsif first_in
- Time.now.to_f + Rufus.duration_to_f(first_in)
- else
- Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
- end
-
- do_schedule_at(first_at, params, &block)
- end
-
- #
- # a shortcut for schedule_every
- #
- alias :every :schedule_every
-
- #
- # Schedules a cron job, the 'cron_line' is a string
- # following the Unix cron standard (see "man 5 crontab" in your command
- # line, or http://www.google.com/search?q=man%205%20crontab).
- #
- # For example :
- #
- # scheduler.schedule("5 0 * * *", s)
- # # will trigger the schedulable s every day
- # # five minutes after midnight
- #
- # scheduler.schedule("15 14 1 * *", s)
- # # will trigger s at 14:15 on the first of every month
- #
- # scheduler.schedule("0 22 * * 1-5") do
- # puts "it's break time..."
- # end
- # # outputs a message every weekday at 10pm
- #
- # Returns the job id attributed to this 'cron job', this id can
- # be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- def schedule (cron_line, params={}, &block)
-
- params = prepare_params(params)
-
- #
- # is a job with the same id already scheduled ?
-
- cron_id = params[:cron_id] || params[:job_id]
-
- #@unschedule_queue << cron_id
-
- #
- # schedule
-
- b = to_block(params, &block)
- job = CronJob.new(self, cron_id, cron_line, params, &b)
-
- @schedule_queue << job
-
- job.job_id
- end
-
- #
- # an alias for schedule()
- #
- alias :cron :schedule
-
- #--
- #
- # The UNscheduling methods
- #
- #++
-
- #
- # Unschedules an 'at' or a 'cron' job identified by the id
- # it was given at schedule time.
- #
- def unschedule (job_id)
-
- @unschedule_queue << job_id
- end
-
- #
- # Unschedules a cron job
- #
- # (deprecated : use unschedule(job_id) for all the jobs !)
- #
- def unschedule_cron_job (job_id)
-
- unschedule(job_id)
- end
-
- #--
- #
- # 'query' methods
- #
- #++
-
- #
- # Returns the job corresponding to job_id, an instance of AtJob
- # or CronJob will be returned.
- #
- def get_job (job_id)
-
- @cron_jobs[job_id] || @non_cron_jobs[job_id]
- end
-
- #
- # Finds a job (via get_job()) and then returns the wrapped
- # schedulable if any.
- #
- def get_schedulable (job_id)
-
- j = get_job(job_id)
- j.respond_to?(:schedulable) ? j.schedulable : nil
- end
-
- #
- # Returns an array of jobs that have the given tag.
- #
- def find_jobs (tag=nil)
-
- jobs = @cron_jobs.values + @non_cron_jobs.values
- jobs = jobs.select { |job| job.has_tag?(tag) } if tag
- jobs
- end
-
- #
- # Returns all the jobs in the scheduler.
- #
- def all_jobs
-
- find_jobs()
- end
-
- #
- # Finds the jobs with the given tag and then returns an array of
- # the wrapped Schedulable objects.
- # Jobs that haven't a wrapped Schedulable won't be included in the
- # result.
- #
- def find_schedulables (tag)
-
- find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
- end
-
- #
- # Returns the number of currently pending jobs in this scheduler
- # ('at' jobs and 'every' jobs).
- #
- def pending_job_count
-
- @pending_jobs.size
- end
-
- #
- # Returns the number of cron jobs currently active in this scheduler.
- #
- def cron_job_count
-
- @cron_jobs.size
- end
-
- #
- # Returns the current count of 'every' jobs scheduled.
- #
- def every_job_count
-
- @non_cron_jobs.values.select { |j| j.class == EveryJob }.size
- end
-
- #
- # Returns the current count of 'at' jobs scheduled (not 'every').
- #
- def at_job_count
-
- @non_cron_jobs.values.select { |j| j.class == AtJob }.size
- end
-
- #
- # Returns true if the given string seems to be a cron string.
- #
- def self.is_cron_string (s)
-
- s.match ".+ .+ .+ .+ .+" # well...
- end
-
- private
-
- #
- # the unschedule work itself.
- #
- def do_unschedule (job_id)
-
- job = get_job job_id
-
- return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob)
-
- return false unless job # not found
-
- if job.is_a?(AtJob) # catches AtJob and EveryJob instances
- @non_cron_jobs.delete(job_id)
- job.params[:dont_reschedule] = true # for AtJob as well, no worries
- end
-
- for i in 0...@pending_jobs.length
- if @pending_jobs[i].job_id == job_id
- @pending_jobs.delete_at i
- return true # asap
- end
- end
-
- true
- end
-
- #
- # Making sure that params is a Hash.
- #
- def prepare_params (params)
-
- params.is_a?(Schedulable) ? { :schedulable => params } : params
- end
-
- #
- # The core method behind schedule_at and schedule_in (and also
- # schedule_every). It's protected, don't use it directly.
- #
- def do_schedule_at (at, params={}, &block)
-
- job = params.delete(:job)
-
- unless job
-
- jobClass = params[:every] ? EveryJob : AtJob
-
- b = to_block(params, &block)
-
- job = jobClass.new(self, at_to_f(at), params[:job_id], params, &b)
- end
-
- if jobClass == AtJob && job.at < (Time.new.to_f + @precision)
-
- job.trigger() unless params[:discard_past]
-
- @non_cron_jobs.delete job.job_id # just to be sure
-
- return nil
- end
-
- @non_cron_jobs[job.job_id] = job
-
- @schedule_queue << job
-
- job.job_id
- end
-
- #
- # Ensures an 'at' instance is translated to a float
- # (to be compared with the float coming from time.to_f)
- #
- def at_to_f (at)
-
- at = Rufus::to_ruby_time(at) if at.kind_of?(String)
- at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
- at = at.to_f if at.kind_of?(Time)
-
- raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float)
-
- at
- end
-
- #
- # Returns a block. If a block is passed, will return it, else,
- # if a :schedulable is set in the params, will return a block
- # wrapping a call to it.
- #
- def to_block (params, &block)
-
- return block if block
-
- schedulable = params.delete(:schedulable)
-
- return nil unless schedulable
-
- l = lambda do
- schedulable.trigger(params)
- end
- class << l
- attr_accessor :schedulable
- end
- l.schedulable = schedulable
-
- l
- end
-
- #
- # Pushes an 'at' job into the pending job list
- #
- def push_pending_job (job)
-
- old = @pending_jobs.find { |j| j.job_id == job.job_id }
- @pending_jobs.delete(old) if old
- #
- # override previous job with same id
-
- if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
- @pending_jobs << job
- return
- end
-
- for i in 0...@pending_jobs.length
- if job.at <= @pending_jobs[i].at
- @pending_jobs[i, 0] = job
- return # right place found
- end
- end
- end
-
- #
- # This is the method called each time the scheduler wakes up
- # (by default 4 times per second). It's meant to quickly
- # determine if there are jobs to trigger else to get back to sleep.
- # 'cron' jobs get executed if necessary then 'at' jobs.
- #
- def step
-
- step_unschedule
- # unschedules any job in the unschedule queue before
- # they have a chance to get triggered.
-
- step_trigger
- # triggers eligible jobs
-
- step_schedule
- # schedule new jobs
-
- # done.
- end
-
- #
- # unschedules jobs in the unschedule_queue
- #
- def step_unschedule
-
- loop do
-
- break if @unschedule_queue.empty?
-
- do_unschedule(@unschedule_queue.pop)
- end
- end
-
- #
- # adds every job waiting in the @schedule_queue to
- # either @pending_jobs or @cron_jobs.
- #
- def step_schedule
-
- loop do
-
- break if @schedule_queue.empty?
-
- j = @schedule_queue.pop
-
- if j.is_a?(CronJob)
-
- @cron_jobs[j.job_id] = j
-
- else # it's an 'at' job
-
- push_pending_job j
- end
- end
- end
-
- #
- # triggers every eligible pending (at or every) jobs, then every eligible
- # cron jobs.
- #
- def step_trigger
-
- now = Time.now
-
- if @exit_when_no_more_jobs && @pending_jobs.size < 1
-
- @stopped = true
- return
- end
-
- # TODO : eventually consider running cron / pending
- # job triggering in two different threads
- #
- # but well... there's the synchronization issue...
-
- #
- # cron jobs
-
- if now.sec != @last_cron_second
-
- @last_cron_second = now.sec
-
- @cron_jobs.each do |cron_id, cron_job|
- #trigger(cron_job) if cron_job.matches?(now, @precision)
- cron_job.trigger if cron_job.matches?(now)
- end
- end
-
- #
- # pending jobs
-
- now = now.to_f
- #
- # that's what at jobs do understand
-
- loop do
-
- break if @pending_jobs.length < 1
-
- job = @pending_jobs[0]
-
- break if job.at > now
-
- #if job.at <= now
- #
- # obviously
-
- job.trigger
-
- @pending_jobs.delete_at 0
- end
- end
-
- #
- # If an error occurs in the job, it well get caught and an error
- # message will be displayed to STDOUT.
- # If this scheduler provides a lwarn(message) method, it will
- # be used insted.
- #
- # Of course, one can override this method.
- #
- def log_exception (e)
-
- message =
- "trigger() caught exception\n" +
- e.to_s + "\n" +
- e.backtrace.join("\n")
-
- if self.respond_to?(:lwarn)
- lwarn { message }
- else
- puts message
- end
- end
- end
-
- #
- # This module adds a trigger method to any class that includes it.
- # The default implementation feature here triggers an exception.
- #
- module Schedulable
-
- def trigger (params)
- raise "trigger() implementation is missing"
- end
-
- def reschedule (scheduler)
- raise "reschedule() implentation is missing"
- end
- end
-
- #
- # This error is thrown when the :timeout attribute triggers
- #
- class TimeOutError < RuntimeError
- end
-
- protected
-
- JOB_ID_LOCK = Mutex.new
- #
- # would it be better to use a Mutex instead of a full-blown
- # Monitor ?
-
- #
- # The parent class for scheduled jobs.
- #
- class Job
-
- @@last_given_id = 0
- #
- # as a scheduler is fully transient, no need to
- # have persistent ids, a simple counter is sufficient
-
- #
- # The identifier for the job
- #
- attr_accessor :job_id
-
- #
- # An array of tags
- #
- attr_accessor :tags
-
- #
- # The block to execute at trigger time
- #
- attr_accessor :block
-
- #
- # A reference to the scheduler
- #
- attr_reader :scheduler
-
- #
- # Keeping a copy of the initialization params of the job.
- #
- attr_reader :params
-
- #
- # if the job is currently executing, this field points to
- # the 'trigger thread'
- #
- attr_reader :trigger_thread
-
-
- def initialize (scheduler, job_id, params, &block)
-
- @scheduler = scheduler
- @block = block
-
- if job_id
- @job_id = job_id
- else
- JOB_ID_LOCK.synchronize do
- @job_id = @@last_given_id
- @@last_given_id = @job_id + 1
- end
- end
-
- @params = params
-
- #@tags = Array(tags).collect { |tag| tag.to_s }
- # making sure we have an array of String tags
-
- @tags = Array(params[:tags])
- # any tag is OK
- end
-
- #
- # Returns true if this job sports the given tag
- #
- def has_tag? (tag)
-
- @tags.include?(tag)
- end
-
- #
- # Removes (cancels) this job from its scheduler.
- #
- def unschedule
-
- @scheduler.unschedule(@job_id)
- end
-
- #
- # Triggers the job (in a dedicated thread).
- #
- def trigger
-
- Thread.new do
-
- @trigger_thread = Thread.current
- # keeping track of the thread
-
- begin
-
- do_trigger
-
- rescue Exception => e
-
- @scheduler.send(:log_exception, e)
- end
-
- #@trigger_thread = nil if @trigger_thread = Thread.current
- @trigger_thread = nil
- # overlapping executions, what to do ?
- end
-
- if trigger_thread_alive? and (to = @params[:timeout])
- @scheduler.in(to, :tags => 'timeout') do
- @trigger_thread.raise(Rufus::TimeOutError) if trigger_thread_alive?
- end
- end
- end
-
- protected
-
- def trigger_thread_alive?
- (@trigger_thread && @trigger_thread.alive?)
- end
- end
-
- #
- # An 'at' job.
- #
- class AtJob < Job
-
- #
- # The float representation (Time.to_f) of the time at which
- # the job should be triggered.
- #
- attr_accessor :at
-
-
- def initialize (scheduler, at, at_id, params, &block)
-
- super(scheduler, at_id, params, &block)
- @at = at
- end
-
- #
- # Returns the Time instance at which this job is scheduled.
- #
- def schedule_info
-
- Time.at(@at)
- end
-
- #
- # next_time is last_time (except for EveryJob instances). Returns
- # a Time instance.
- #
- def next_time
-
- schedule_info
- end
-
- protected
-
- #
- # Triggers the job (calls the block)
- #
- def do_trigger
-
- @block.call @job_id, @at
-
- @scheduler.instance_variable_get(:@non_cron_jobs).delete @job_id
- end
- end
-
- #
- # An 'every' job is simply an extension of an 'at' job.
- #
- class EveryJob < AtJob
-
- #
- # Returns the frequency string used to schedule this EveryJob,
- # like for example "3d" or "1M10d3h".
- #
- def schedule_info
-
- @params[:every]
- end
-
- protected
-
- #
- # triggers the job, then reschedules it if necessary
- #
- def do_trigger
-
- hit_exception = false
-
- begin
-
- @block.call @job_id, @at, @params
-
- rescue Exception => e
-
- @scheduler.send(:log_exception, e)
-
- hit_exception = true
- end
-
- if \
- @scheduler.instance_variable_get(:@exit_when_no_more_jobs) or
- (@params[:dont_reschedule] == true) or
- (hit_exception and @params[:try_again] == false)
-
- @scheduler.instance_variable_get(:@non_cron_jobs).delete(job_id)
- # maybe it'd be better to wipe that reference from here anyway...
-
- return
- end
-
- #
- # ok, reschedule ...
-
-
- params[:job] = self
-
- @at = @at + Rufus.duration_to_f(params[:every])
-
- @scheduler.send(:do_schedule_at, @at, params)
- end
- end
-
- #
- # A cron job.
- #
- class CronJob < Job
-
- #
- # The CronLine instance representing the times at which
- # the cron job has to be triggered.
- #
- attr_accessor :cron_line
-
- def initialize (scheduler, cron_id, line, params, &block)
-
- super(scheduler, cron_id, params, &block)
-
- if line.is_a?(String)
-
- @cron_line = CronLine.new(line)
-
- elsif line.is_a?(CronLine)
-
- @cron_line = line
-
- else
-
- raise \
- "Cannot initialize a CronJob " +
- "with a param of class #{line.class}"
- end
- end
-
- #
- # This is the method called by the scheduler to determine if it
- # has to fire this CronJob instance.
- #
- def matches? (time)
- #def matches? (time, precision)
-
- #@cron_line.matches?(time, precision)
- @cron_line.matches?(time)
- end
-
- #
- # Returns the original cron tab string used to schedule this
- # Job. Like for example "60/3 * * * Sun".
- #
- def schedule_info
-
- @cron_line.original
- end
-
- #
- # Returns a Time instance : the next time this cron job is
- # supposed to "fire".
- #
- # 'from' is used to specify the starting point for determining
- # what will be the next time. Defaults to now.
- #
- def next_time (from=Time.now)
-
- @cron_line.next_time(from)
- end
-
- protected
-
- #
- # As the name implies.
- #
- def do_trigger
-
- @block.call @job_id, @cron_line, @params
- end
- end
-
-end
+require 'rufus/scheduler/scheduler'