lib/rufus/scheduler.rb in rufus-scheduler-1.0.5 vs lib/rufus/scheduler.rb in rufus-scheduler-1.0.6
- old
+ new
@@ -6,14 +6,14 @@
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
-#
+#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
-#
+#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
@@ -33,1399 +33,1417 @@
require 'rufus/otime'
module Rufus
+ #
+ # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
+ # 'at' jobs to execute once at a given point in time. 'cron' jobs
+ # execute a specified intervals.
+ # The two main methods are thus schedule_at() and schedule().
+ #
+ # schedule_at() and schedule() await either a Schedulable instance and
+ # params (usually an array or nil), either a block, which is more in the
+ # Ruby way.
+ #
+ # == The gem "openwferu-scheduler"
+ #
+ # This scheduler was previously known as the "openwferu-scheduler" gem.
+ #
+ # To ensure that code tapping the previous gem still runs fine with
+ # "rufus-scheduler", this new gem has 'pointers' for the old class
+ # names.
+ #
+ # require 'rubygems'
+ # require 'openwfe/util/scheduler'
+ # s = OpenWFE::Scheduler.new
+ #
+ # will still run OK with "rufus-scheduler".
+ #
+ # == Examples
+ #
+ # require 'rubygems'
+ # require 'rufus/scheduler'
+ #
+ #
+ # scheduler.schedule_in("3d") do
+ # regenerate_monthly_report()
+ # end
+ # #
+ # # will call the regenerate_monthly_report method
+ # # in 3 days from now
+ #
+ # scheduler.schedule "0 22 * * 1-5" do
+ # log.info "activating security system..."
+ # activate_security_system()
+ # end
+ #
+ # job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
+ # init_self_destruction_sequence()
+ # end
+ #
+ # an example that uses a Schedulable class :
+ #
+ # class Regenerator < Schedulable
+ # def trigger (frequency)
+ # self.send(frequency)
+ # end
+ # def monthly
+ # # ...
+ # end
+ # def yearly
+ # # ...
+ # end
+ # end
+ #
+ # regenerator = Regenerator.new
+ #
+ # scheduler.schedule_in("4d", regenerator)
+ # #
+ # # will regenerate the report in four days
+ #
+ # scheduler.schedule_in(
+ # "5d",
+ # { :schedulable => regenerator, :scope => :month })
+ # #
+ # # will regenerate the monthly report in 5 days
+ #
+ # There is also schedule_every() :
+ #
+ # scheduler.schedule_every("1h20m") do
+ # regenerate_latest_report()
+ # end
+ #
+ # The scheduler has a "exit_when_no_more_jobs" attribute. When set to
+ # 'true', the scheduler will exit as soon as there are no more jobs to
+ # run.
+ # Use with care though, if you create a scheduler, set this attribute
+ # to true and start the scheduler, the scheduler will immediately exit.
+ # This attribute is best used indirectly : the method
+ # join_until_no_more_jobs() wraps it.
+ #
+ # The :scheduler_precision can be set when instantiating the scheduler.
+ #
+ # scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
+ # scheduler.start
+ # #
+ # # instatiates a scheduler that checks its jobs twice per second
+ # # (the default is 4 times per second (0.250))
+ #
+ # Note that rufus-scheduler places a constraint on the values for the
+ # precision : 0.0 < p <= 1.0
+ # Thus
+ #
+ # scheduler.precision = 4.0
+ #
+ # or
+ #
+ # scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
+ #
+ # will raise an exception.
+ #
+ #
+ # == Tags
+ #
+ # Tags can be attached to jobs scheduled :
+ #
+ # scheduler.schedule_in "2h", :tags => "backup" do
+ # init_backup_sequence()
+ # end
+ #
+ # scheduler.schedule "0 24 * * *", :tags => "new_day" do
+ # do_this_or_that()
+ # end
+ #
+ # jobs = find_jobs 'backup'
+ # jobs.each { |job| job.unschedule }
+ #
+ # Multiple tags may be attached to a single job :
+ #
+ # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
+ # init_backup_sequence()
+ # end
+ #
+ # The vanilla case for tags assume they are String instances, but nothing
+ # prevents you from using anything else. The scheduler has no persistence
+ # by itself, so no serialization issue.
+ #
+ #
+ # == Cron up to the second
+ #
+ # A cron schedule can be set at the second level :
+ #
+ # scheduler.schedule "7 * * * * *" do
+ # puts "it's now the seventh second of the minute"
+ # end
+ #
+ # The rufus scheduler recognizes an optional first column for second
+ # scheduling. This column can, like for the other columns, specify a
+ # value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
+ #
+ # == Exceptions
+ #
+ # The rufus scheduler will output a stacktrace to the STDOUT in
+ # case of exception. There are two ways to change that behaviour.
+ #
+ # # 1 - providing a lwarn method to the scheduler instance :
+ #
+ # class << scheduler
+ # def lwarn (&block)
+ # puts "oops, something wrong happened : "
+ # puts block.call
+ # end
+ # end
+ #
+ # # 2 - overriding the [protected] method log_exception(e) :
+ #
+ # class << scheduler
+ # def log_exception (e)
+ # puts "something wrong happened : "+e.to_s
+ # end
+ # end
+ #
+ # == 'Every jobs' and rescheduling
+ #
+ # Every jobs can reschedule/unschedule themselves. A reschedule example :
+ #
+ # schedule.schedule_every "5h" do |job_id, at, params|
+ #
+ # mails = $inbox.fetch_mails
+ # mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
+ #
+ # params[:every] = if mails.size > 100
+ # "1h" # lots of spam, check every hour
+ # else
+ # "5h" # normal schedule, every 5 hours
+ # end
+ # end
+ #
+ # Unschedule example :
+ #
+ # schedule.schedule_every "10s" do |job_id, at, params|
+ # #
+ # # polls every 10 seconds until a mail arrives
+ #
+ # $mail = $inbox.fetch_last_mail
+ #
+ # params[:dont_reschedule] = true if $mail
+ # end
+ #
+ # == 'Every jobs', :first_at and :first_in
+ #
+ # Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
+ # optional parameters, :first_at and :first_in
+ #
+ # scheduler.schedule_every "2d", :first_in => "5h" do
+ # # schedule something every two days, start in 5 hours...
+ # end
+ #
+ # scheduler.schedule_every "2d", :first_at => "5h" do
+ # # schedule something every two days, start in 5 hours...
+ # end
+ #
+ # == :thread_name option
+ #
+ # You can specify the name of the scheduler's thread. Should make
+ # it easier in some debugging situations.
+ #
+ # scheduler.new :thread_name => "the crazy scheduler"
+ #
+ class Scheduler
+
#
- # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
- # 'at' jobs to execute once at a given point in time. 'cron' jobs
- # execute a specified intervals.
- # The two main methods are thus schedule_at() and schedule().
+ # By default, the precision is 0.250, with means the scheduler
+ # will check for jobs to execute 4 times per second.
#
- # schedule_at() and schedule() await either a Schedulable instance and
- # params (usually an array or nil), either a block, which is more in the
- # Ruby way.
+ attr_reader :precision
+
#
- # == The gem "openwferu-scheduler"
+ # Setting the precision ( 0.0 < p <= 1.0 )
#
- # This scheduler was previously known as the "openwferu-scheduler" gem.
- #
- # To ensure that code tapping the previous gem still runs fine with
- # "rufus-scheduler", this new gem has 'pointers' for the old class
- # names.
+ def precision= (f)
+
+ raise "precision must be 0.0 < p <= 1.0" \
+ if f <= 0.0 or f > 1.0
+
+ @precision = f
+ end
+
+ #--
+ # Set by default at 0.00045, it's meant to minimize drift
#
- # require 'rubygems'
- # require 'openwfe/util/scheduler'
- # s = OpenWFE::Scheduler.new
+ #attr_accessor :correction
+ #++
+
#
- # will still run OK with "rufus-scheduler".
+ # As its name implies.
#
- # == Examples
+ attr_accessor :stopped
+
+
+ def initialize (params={})
+
+ super()
+
+ @pending_jobs = []
+ @cron_jobs = {}
+
+ @schedule_queue = Queue.new
+ @unschedule_queue = Queue.new
+ #
+ # sync between the step() method and the [un]schedule
+ # methods is done via these queues, no more mutex
+
+ @scheduler_thread = nil
+
+ @precision = 0.250
+ # every 250ms, the scheduler wakes up (default value)
+ begin
+ self.precision = Float(params[:scheduler_precision])
+ rescue Exception => e
+ # let precision at its default value
+ end
+
+ @thread_name = params[:thread_name] || "rufus scheduler"
+
+ #@correction = 0.00045
+
+ @exit_when_no_more_jobs = false
+ @dont_reschedule_every = false
+
+ @last_cron_second = -1
+
+ @stopped = true
+ end
+
#
- # require 'rubygems'
- # require 'rufus/scheduler'
- #
- #
- # scheduler.schedule_in("3d") do
- # regenerate_monthly_report()
- # end
- # #
- # # will call the regenerate_monthly_report method
- # # in 3 days from now
+ # Starts this scheduler (or restart it if it was previously stopped)
#
- # scheduler.schedule "0 22 * * 1-5" do
- # log.info "activating security system..."
- # activate_security_system()
- # end
+ def start
+
+ @stopped = false
+
+ @scheduler_thread = Thread.new do
+
+ Thread.current[:name] = @thread_name
+
+ if defined?(JRUBY_VERSION)
+ require 'java'
+ java.lang.Thread.current_thread.name = @thread_name
+ end
+
+ loop do
+
+ break if @stopped
+
+ t0 = Time.now.to_f
+
+ step
+
+ d = Time.now.to_f - t0 # + @correction
+
+ next if d > @precision
+
+ sleep (@precision - d)
+ end
+ end
+ end
+
#
- # job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
- # init_self_destruction_sequence()
- # end
+ # The scheduler is stoppable via sstop()
#
- # an example that uses a Schedulable class :
+ def stop
+
+ @stopped = true
+ end
+
+ # (for backward compatibility)
#
- # class Regenerator < Schedulable
- # def trigger (frequency)
- # self.send(frequency)
- # end
- # def monthly
- # # ...
- # end
- # def yearly
- # # ...
- # end
- # end
+ alias :sstart :start
+
+ # (for backward compatibility)
#
- # regenerator = Regenerator.new
+ alias :sstop :stop
+
#
- # scheduler.schedule_in("4d", regenerator)
- # #
- # # will regenerate the report in four days
+ # Joins on the scheduler thread
#
- # scheduler.schedule_in(
- # "5d",
- # { :schedulable => regenerator, :scope => :month })
- # #
- # # will regenerate the monthly report in 5 days
+ def join
+
+ @scheduler_thread.join
+ end
+
#
- # There is also schedule_every() :
+ # Like join() but takes care of setting the 'exit_when_no_more_jobs'
+ # attribute of this scheduler to true before joining.
+ # Thus the scheduler will exit (and the join terminates) as soon as
+ # there aren't no more 'at' (or 'every') jobs in the scheduler.
#
- # scheduler.schedule_every("1h20m") do
- # regenerate_latest_report()
- # end
+ # Currently used only in unit tests.
#
- # The scheduler has a "exit_when_no_more_jobs" attribute. When set to
- # 'true', the scheduler will exit as soon as there are no more jobs to
- # run.
- # Use with care though, if you create a scheduler, set this attribute
- # to true and start the scheduler, the scheduler will immediately exit.
- # This attribute is best used indirectly : the method
- # join_until_no_more_jobs() wraps it.
+ def join_until_no_more_jobs
+
+ @exit_when_no_more_jobs = true
+ join
+ end
+
+ #--
#
- # The :scheduler_precision can be set when instantiating the scheduler.
+ # The scheduling methods
#
- # scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
- # scheduler.start
- # #
- # # instatiates a scheduler that checks its jobs twice per second
- # # (the default is 4 times per second (0.250))
+ #++
+
#
+ # Schedules a job by specifying at which time it should trigger.
+ # Returns the a job_id that can be used to unschedule the job.
#
- # == Tags
+ # This method returns a job identifier which can be used to unschedule()
+ # the job.
#
- # Tags can be attached to jobs scheduled :
+ # If the job is specified in the past, it will be triggered immediately
+ # but not scheduled.
+ # To avoid the triggering, the parameter :discard_past may be set to
+ # true :
#
- # scheduler.schedule_in "2h", :tags => "backup" do
- # init_backup_sequence()
- # end
+ # jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
+ # puts "you'll never read this message"
+ # end
#
- # scheduler.schedule "0 24 * * *", :tags => "new_day" do
- # do_this_or_that()
- # end
+ # And 'jobid' will hold a nil (not scheduled).
#
- # jobs = find_jobs 'backup'
- # jobs.each { |job| job.unschedule }
#
- # Multiple tags may be attached to a single job :
+ def schedule_at (at, params={}, &block)
+
+ do_schedule_at(
+ at,
+ prepare_params(params),
+ &block)
+ end
+
+
#
- # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
- # init_backup_sequence()
- # end
+ # Schedules a job by stating in how much time it should trigger.
+ # Returns the a job_id that can be used to unschedule the job.
#
- # The vanilla case for tags assume they are String instances, but nothing
- # prevents you from using anything else. The scheduler has no persistence
- # by itself, so no serialization issue.
+ # This method returns a job identifier which can be used to unschedule()
+ # the job.
#
+ def schedule_in (duration, params={}, &block)
+
+ do_schedule_at(
+ Time.new.to_f + duration_to_f(duration),
+ prepare_params(params),
+ &block)
+ end
+
#
- # == Cron up to the second
+ # Schedules a job in a loop. After an execution, it will not execute
+ # before the time specified in 'freq'.
#
- # A cron schedule can be set at the second level :
+ # This method returns a job identifier which can be used to unschedule()
+ # the job.
#
- # scheduler.schedule "7 * * * * *" do
- # puts "it's now the seventh second of the minute"
- # end
+ # In case of exception in the job, it will be rescheduled. If you don't
+ # want the job to be rescheduled, set the parameter :try_again to false.
#
- # The rufus scheduler recognizes an optional first column for second
- # scheduling. This column can, like for the other columns, specify a
- # value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
+ # scheduler.schedule_every "500", :try_again => false do
+ # do_some_prone_to_error_stuff()
+ # # won't get rescheduled in case of exception
+ # end
#
- # == Exceptions
- #
- # The rufus scheduler will output a stacktrace to the STDOUT in
- # case of exception. There are two ways to change that behaviour.
+ # Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
+ # accepted.
#
- # # 1 - providing a lwarn method to the scheduler instance :
- #
- # class << scheduler
- # def lwarn (&block)
- # puts "oops, something wrong happened : "
- # puts block.call
- # end
- # end
+ # scheduler.schedule_every "2d", :first_in => "5h" do
+ # # schedule something every two days, start in 5 hours...
+ # end
#
- # # 2 - overriding the [protected] method log_exception(e) :
- #
- # class << scheduler
- # def log_exception (e)
- # puts "something wrong happened : "+e.to_s
- # end
- # end
- #
- # == 'Every jobs' and rescheduling
- #
- # Every jobs can reschedule/unschedule themselves. A reschedule example :
- #
- # schedule.schedule_every "5h" do |job_id, at, params|
- #
- # mails = $inbox.fetch_mails
- # mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
- #
- # params[:every] = if mails.size > 100
- # "1h" # lots of spam, check every hour
- # else
- # "5h" # normal schedule, every 5 hours
- # end
- # end
- #
- # Unschedule example :
- #
- # schedule.schedule_every "10s" do |job_id, at, params|
- # #
- # # polls every 10 seconds until a mail arrives
- #
- # $mail = $inbox.fetch_last_mail
- #
- # params[:dont_reschedule] = true if $mail
- # end
- #
- # == 'Every jobs', :first_at and :first_in
- #
- # Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two
- # optional parameters, :first_at and :first_in
- #
- # scheduler.schedule_every "2d", :first_in => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- # scheduler.schedule_every "2d", :first_at => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- class Scheduler
+ def schedule_every (freq, params={}, &block)
- #
- # By default, the precision is 0.250, with means the scheduler
- # will check for jobs to execute 4 times per second.
- #
- attr_reader :precision
+ f = duration_to_f freq
- #
- # Setting the precision ( 0.0 < p <= 1.0 )
- #
- def precision= (f)
+ params = prepare_params params
+ schedulable = params[:schedulable]
+ params[:every] = freq
- raise "precision must be 0.0 < p <= 1.0" \
- if f <= 0.0 or f > 1.0
+ first_at = params.delete :first_at
+ first_in = params.delete :first_in
- @precision = f
- end
+ previous_at = params[:previous_at]
- #--
- # Set by default at 0.00045, it's meant to minimize drift
- #
- #attr_accessor :correction
- #++
+ next_at = if first_at
+ first_at
+ elsif first_in
+ Time.now.to_f + duration_to_f(first_in)
+ elsif previous_at
+ previous_at + f
+ else
+ Time.now.to_f + f
+ end
+ do_schedule_at(next_at, params) do |job_id, at|
+
#
- # As its name implies.
- #
- attr_accessor :stopped
+ # trigger ...
+ hit_exception = false
- def initialize (params={})
+ begin
- super()
+ if schedulable
+ schedulable.trigger params
+ else
+ block.call job_id, at, params
+ end
- @pending_jobs = []
- @cron_jobs = {}
+ rescue Exception => e
- @schedule_queue = Queue.new
- @unschedule_queue = Queue.new
- #
- # sync between the step() method and the [un]schedule
- # methods is done via these queues, no more mutex
+ log_exception e
- @scheduler_thread = nil
+ hit_exception = true
+ end
- @precision = 0.250
- # every 250ms, the scheduler wakes up (default value)
- begin
- self.precision = Float(params[:scheduler_precision])
- rescue Exception => e
- # let precision at its default value
- end
+ # cannot use a return here !!! (block)
- #@correction = 0.00045
+ unless \
+ @dont_reschedule_every or
+ (params[:dont_reschedule] == true) or
+ (hit_exception and params[:try_again] == false)
- @exit_when_no_more_jobs = false
- @dont_reschedule_every = false
+ #
+ # ok, reschedule ...
- @last_cron_second = -1
+ params[:job_id] = job_id
+ params[:previous_at] = at
- @stopped = true
+ schedule_every params[:every], params, &block
+ #
+ # yes, this is a kind of recursion
+
+ # note that params[:every] might have been changed
+ # by the block/schedulable code
end
- #
- # Starts this scheduler (or restart it if it was previously stopped)
- #
- def start
+ job_id
+ end
+ end
- @stopped = false
+ #
+ # Schedules a cron job, the 'cron_line' is a string
+ # following the Unix cron standard (see "man 5 crontab" in your command
+ # line, or http://www.google.com/search?q=man%205%20crontab).
+ #
+ # For example :
+ #
+ # scheduler.schedule("5 0 * * *", s)
+ # # will trigger the schedulable s every day
+ # # five minutes after midnight
+ #
+ # scheduler.schedule("15 14 1 * *", s)
+ # # will trigger s at 14:15 on the first of every month
+ #
+ # scheduler.schedule("0 22 * * 1-5") do
+ # puts "it's break time..."
+ # end
+ # # outputs a message every weekday at 10pm
+ #
+ # Returns the job id attributed to this 'cron job', this id can
+ # be used to unschedule the job.
+ #
+ # This method returns a job identifier which can be used to unschedule()
+ # the job.
+ #
+ def schedule (cron_line, params={}, &block)
- @scheduler_thread = Thread.new do
+ params = prepare_params(params)
- if defined?(JRUBY_VERSION)
+ #
+ # is a job with the same id already scheduled ?
- require 'java'
+ cron_id = params[:cron_id]
+ cron_id = params[:job_id] unless cron_id
- java.lang.Thread.current_thread.name = \
- "openwferu scheduler (Ruby Thread)"
- end
+ #unschedule(cron_id) if cron_id
+ @unschedule_queue << [ :cron, cron_id ]
- loop do
+ #
+ # schedule
- break if @stopped
+ b = to_block(params, &block)
+ job = CronJob.new(self, cron_id, cron_line, params, &b)
- t0 = Time.now.to_f
+ #@cron_jobs[job.job_id] = job
+ @schedule_queue << job
- step
+ job.job_id
+ end
- d = Time.now.to_f - t0 # + @correction
+ #--
+ #
+ # The UNscheduling methods
+ #
+ #++
- next if d > @precision
+ #
+ # Unschedules an 'at' or a 'cron' job identified by the id
+ # it was given at schedule time.
+ #
+ def unschedule (job_id)
- sleep (@precision - d)
- end
- end
- end
+ @unschedule_queue << [ :at, job_id ]
+ end
- #
- # The scheduler is stoppable via sstop()
- #
- def stop
+ #
+ # Unschedules a cron job
+ #
+ def unschedule_cron_job (job_id)
- @stopped = true
- end
+ @unschedule_queue << [ :cron, job_id ]
+ end
- # (for backward compatibility)
- #
- alias :sstart :start
+ #--
+ #
+ # 'query' methods
+ #
+ #++
- # (for backward compatibility)
- #
- alias :sstop :stop
+ #
+ # Returns the job corresponding to job_id, an instance of AtJob
+ # or CronJob will be returned.
+ #
+ def get_job (job_id)
- #
- # Joins on the scheduler thread
- #
- def join
+ job = @cron_jobs[job_id]
+ return job if job
- @scheduler_thread.join
- end
+ @pending_jobs.find do |job|
+ job.job_id == job_id
+ end
+ end
- #
- # Like join() but takes care of setting the 'exit_when_no_more_jobs'
- # attribute of this scheduler to true before joining.
- # Thus the scheduler will exit (and the join terminates) as soon as
- # there aren't no more 'at' (or 'every') jobs in the scheduler.
- #
- # Currently used only in unit tests.
- #
- def join_until_no_more_jobs
+ #
+ # Finds a job (via get_job()) and then returns the wrapped
+ # schedulable if any.
+ #
+ def get_schedulable (job_id)
- @exit_when_no_more_jobs = true
- join
- end
+ #return nil unless job_id
- #--
- #
- # The scheduling methods
- #
- #++
+ j = get_job(job_id)
- #
- # Schedules a job by specifying at which time it should trigger.
- # Returns the a job_id that can be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- # If the job is specified in the past, it will be triggered immediately
- # but not scheduled.
- # To avoid the triggering, the parameter :discard_past may be set to
- # true :
- #
- # jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
- # puts "you'll never read this message"
- # end
- #
- # And 'jobid' will hold a nil (not scheduled).
- #
- #
- def schedule_at (at, params={}, &block)
+ return j.schedulable if j.respond_to?(:schedulable)
- do_schedule_at(
- at,
- prepare_params(params),
- &block)
- end
+ nil
+ end
+ #
+ # Returns an array of jobs that have the given tag.
+ #
+ def find_jobs (tag)
- #
- # Schedules a job by stating in how much time it should trigger.
- # Returns the a job_id that can be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- def schedule_in (duration, params={}, &block)
+ result = @cron_jobs.values.find_all do |job|
+ job.has_tag?(tag)
+ end
- do_schedule_at(
- Time.new.to_f + duration_to_f(duration),
- prepare_params(params),
- &block)
- end
+ result + @pending_jobs.find_all do |job|
+ job.has_tag?(tag)
+ end
+ end
- #
- # Schedules a job in a loop. After an execution, it will not execute
- # before the time specified in 'freq'.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- # In case of exception in the job, it will be rescheduled. If you don't
- # want the job to be rescheduled, set the parameter :try_again to false.
- #
- # scheduler.schedule_every "500", :try_again => false do
- # do_some_prone_to_error_stuff()
- # # won't get rescheduled in case of exception
- # end
- #
- # Since rufus-scheduler 1.0.2, the params :first_at and :first_in are
- # accepted.
- #
- # scheduler.schedule_every "2d", :first_in => "5h" do
- # # schedule something every two days, start in 5 hours...
- # end
- #
- def schedule_every (freq, params={}, &block)
+ #
+ # Finds the jobs with the given tag and then returns an array of
+ # the wrapped Schedulable objects.
+ # Jobs that haven't a wrapped Schedulable won't be included in the
+ # result.
+ #
+ def find_schedulables (tag)
- f = duration_to_f freq
+ #jobs = find_jobs(tag)
+ #result = []
+ #jobs.each do |job|
+ # result.push(job.schedulable) if job.respond_to?(:schedulable)
+ #end
+ #result
- params = prepare_params params
- schedulable = params[:schedulable]
- params[:every] = freq
+ find_jobs(tags).inject([]) do |result, job|
- first_at = params.delete :first_at
- first_in = params.delete :first_in
+ result.push(job.schedulable) if job.respond_to?(:schedulable)
+ result
+ end
+ end
- previous_at = params[:previous_at]
+ #
+ # Returns the number of currently pending jobs in this scheduler
+ # ('at' jobs and 'every' jobs).
+ #
+ def pending_job_count
- next_at = if first_at
- first_at
- elsif first_in
- Time.now.to_f + duration_to_f(first_in)
- elsif previous_at
- previous_at + f
- else
- Time.now.to_f + f
- end
+ @pending_jobs.size
+ end
- do_schedule_at(next_at, params) do |job_id, at|
+ #
+ # Returns the number of cron jobs currently active in this scheduler.
+ #
+ def cron_job_count
- #
- # trigger ...
+ @cron_jobs.size
+ end
- hit_exception = false
+ #
+ # Returns the current count of 'every' jobs scheduled.
+ #
+ def every_job_count
- begin
+ @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
+ end
- if schedulable
- schedulable.trigger params
- else
- block.call job_id, at, params
- end
+ #
+ # Returns the current count of 'at' jobs scheduled (not 'every').
+ #
+ def at_job_count
- rescue Exception => e
+ @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
+ end
- log_exception e
+ #
+ # Returns true if the given string seems to be a cron string.
+ #
+ def Scheduler.is_cron_string (s)
- hit_exception = true
- end
+ s.match ".+ .+ .+ .+ .+"
+ end
- # cannot use a return here !!! (block)
+ #protected
+ private
- unless \
- @dont_reschedule_every or
- (params[:dont_reschedule] == true) or
- (hit_exception and params[:try_again] == false)
+ def do_unschedule (job_id)
- #
- # ok, reschedule ...
-
- params[:job_id] = job_id
- params[:previous_at] = at
-
- schedule_every params[:every], params, &block
- #
- # yes, this is a kind of recursion
-
- # note that params[:every] might have been changed
- # by the block/schedulable code
- end
-
- job_id
- end
+ for i in 0...@pending_jobs.length
+ if @pending_jobs[i].job_id == job_id
+ @pending_jobs.delete_at i
+ return true
+ end
end
+ #
+ # not using delete_if because it scans the whole list
- #
- # Schedules a cron job, the 'cron_line' is a string
- # following the Unix cron standard (see "man 5 crontab" in your command
- # line, or http://www.google.com/search?q=man%205%20crontab).
- #
- # For example :
- #
- # scheduler.schedule("5 0 * * *", s)
- # # will trigger the schedulable s every day
- # # five minutes after midnight
- #
- # scheduler.schedule("15 14 1 * *", s)
- # # will trigger s at 14:15 on the first of every month
- #
- # scheduler.schedule("0 22 * * 1-5") do
- # puts "it's break time..."
- # end
- # # outputs a message every weekday at 10pm
- #
- # Returns the job id attributed to this 'cron job', this id can
- # be used to unschedule the job.
- #
- # This method returns a job identifier which can be used to unschedule()
- # the job.
- #
- def schedule (cron_line, params={}, &block)
+ do_unschedule_cron_job job_id
+ end
- params = prepare_params(params)
-
- #
- # is a job with the same id already scheduled ?
+ def do_unschedule_cron_job (job_id)
- cron_id = params[:cron_id]
- cron_id = params[:job_id] unless cron_id
+ (@cron_jobs.delete(job_id) != nil)
+ end
- #unschedule(cron_id) if cron_id
- @unschedule_queue << [ :cron, cron_id ]
+ #
+ # Making sure that params is a Hash.
+ #
+ def prepare_params (params)
- #
- # schedule
+ params = { :schedulable => params } \
+ if params.is_a?(Schedulable)
+ params
+ end
- b = to_block(params, &block)
- job = CronJob.new(self, cron_id, cron_line, params, &b)
+ #
+ # The core method behind schedule_at and schedule_in (and also
+ # schedule_every). It's protected, don't use it directly.
+ #
+ def do_schedule_at (at, params={}, &block)
- #@cron_jobs[job.job_id] = job
- @schedule_queue << job
+ #puts "0 at is '#{at.to_s}' (#{at.class})"
- job.job_id
- end
+ at = at_to_f at
- #--
- #
- # The UNscheduling methods
- #
- #++
+ #puts "1 at is '#{at.to_s}' (#{at.class})"}"
- #
- # Unschedules an 'at' or a 'cron' job identified by the id
- # it was given at schedule time.
- #
- def unschedule (job_id)
-
- @unschedule_queue << [ :at, job_id ]
- end
+ jobClass = params[:every] ? EveryJob : AtJob
- #
- # Unschedules a cron job
- #
- def unschedule_cron_job (job_id)
+ job_id = params[:job_id]
- @unschedule_queue << [ :cron, job_id ]
- end
+ b = to_block params, &block
- #--
- #
- # 'query' methods
- #
- #++
+ job = jobClass.new self, at, job_id, params, &b
- #
- # Returns the job corresponding to job_id, an instance of AtJob
- # or CronJob will be returned.
- #
- def get_job (job_id)
+ #do_unschedule(job_id) if job_id
- job = @cron_jobs[job_id]
- return job if job
+ if at < (Time.new.to_f + @precision)
- @pending_jobs.find do |job|
- job.job_id == job_id
- end
+ job.trigger() unless params[:discard_past]
+ return nil
end
- #
- # Finds a job (via get_job()) and then returns the wrapped
- # schedulable if any.
- #
- def get_schedulable (job_id)
+ @schedule_queue << job
- #return nil unless job_id
+ job.job_id
+ end
- j = get_job(job_id)
+ #
+ # Ensures that a duration is a expressed as a Float instance.
+ #
+ # duration_to_f("10s")
+ #
+ # will yields 10.0
+ #
+ def duration_to_f (s)
- return j.schedulable if j.respond_to?(:schedulable)
+ return s if s.kind_of?(Float)
+ return Rufus::parse_time_string(s) if s.kind_of?(String)
+ Float(s.to_s)
+ end
- nil
- end
+ #
+ # Ensures an 'at' instance is translated to a float
+ # (to be compared with the float coming from time.to_f)
+ #
+ def at_to_f (at)
- #
- # Returns an array of jobs that have the given tag.
- #
- def find_jobs (tag)
+ at = Rufus::to_ruby_time(at) if at.kind_of?(String)
+ at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
+ at = at.to_f if at.kind_of?(Time)
+ at
+ end
- result = @cron_jobs.values.find_all do |job|
- job.has_tag?(tag)
- end
+ #
+ # Returns a block. If a block is passed, will return it, else,
+ # if a :schedulable is set in the params, will return a block
+ # wrapping a call to it.
+ #
+ def to_block (params, &block)
- result + @pending_jobs.find_all do |job|
- job.has_tag?(tag)
- end
- end
+ return block if block
- #
- # Finds the jobs with the given tag and then returns an array of
- # the wrapped Schedulable objects.
- # Jobs that haven't a wrapped Schedulable won't be included in the
- # result.
- #
- def find_schedulables (tag)
+ schedulable = params[:schedulable]
- #jobs = find_jobs(tag)
- #result = []
- #jobs.each do |job|
- # result.push(job.schedulable) if job.respond_to?(:schedulable)
- #end
- #result
+ return nil unless schedulable
- find_jobs(tags).inject([]) do |result, job|
+ params.delete :schedulable
- result.push(job.schedulable) if job.respond_to?(:schedulable)
- result
- end
+ l = lambda do
+ schedulable.trigger(params)
end
-
- #
- # Returns the number of currently pending jobs in this scheduler
- # ('at' jobs and 'every' jobs).
- #
- def pending_job_count
-
- @pending_jobs.size
+ class << l
+ attr_accessor :schedulable
end
+ l.schedulable = schedulable
- #
- # Returns the number of cron jobs currently active in this scheduler.
- #
- def cron_job_count
+ l
+ end
- @cron_jobs.size
- end
+ #
+ # Pushes an 'at' job into the pending job list
+ #
+ def push_pending_job (job)
- #
- # Returns the current count of 'every' jobs scheduled.
- #
- def every_job_count
+ old = @pending_jobs.find { |j| j.job_id == job.job_id }
+ @pending_jobs.delete(old) if old
+ #
+ # override previous job with same id
- @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
+ if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
+ @pending_jobs << job
+ return
end
- #
- # Returns the current count of 'at' jobs scheduled (not 'every').
- #
- def at_job_count
-
- @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
+ for i in 0...@pending_jobs.length
+ if job.at <= @pending_jobs[i].at
+ @pending_jobs[i, 0] = job
+ return # right place found
+ end
end
+ end
- #
- # Returns true if the given string seems to be a cron string.
- #
- def Scheduler.is_cron_string (s)
+ #
+ # This is the method called each time the scheduler wakes up
+ # (by default 4 times per second). It's meant to quickly
+ # determine if there are jobs to trigger else to get back to sleep.
+ # 'cron' jobs get executed if necessary then 'at' jobs.
+ #
+ def step
- s.match ".+ .+ .+ .+ .+"
- end
+ #puts Time.now.to_f
+ #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect
- #protected
- private
+ step_unschedule
+ # unschedules any job in the unschedule queue before
+ # they have a chance to get triggered.
- def do_unschedule (job_id)
+ step_trigger
+ # triggers eligible jobs
- for i in 0...@pending_jobs.length
- if @pending_jobs[i].job_id == job_id
- @pending_jobs.delete_at i
- return true
- end
- end
- #
- # not using delete_if because it scans the whole list
+ step_schedule
+ # schedule new jobs
- do_unschedule_cron_job job_id
- end
+ # done.
+ end
- def do_unschedule_cron_job (job_id)
+ #
+ # unschedules jobs in the unschedule_queue
+ #
+ def step_unschedule
- (@cron_jobs.delete(job_id) != nil)
- end
+ loop do
- #
- # Making sure that params is a Hash.
- #
- def prepare_params (params)
+ break if @unschedule_queue.empty?
- params = { :schedulable => params } \
- if params.is_a?(Schedulable)
- params
- end
+ type, job_id = @unschedule_queue.pop
- #
- # The core method behind schedule_at and schedule_in (and also
- # schedule_every). It's protected, don't use it directly.
- #
- def do_schedule_at (at, params={}, &block)
+ if type == :cron
- #puts "0 at is '#{at.to_s}' (#{at.class})"
+ do_unschedule_cron_job job_id
+ else
- at = at_to_f at
+ do_unschedule job_id
+ end
+ end
+ end
- #puts "1 at is '#{at.to_s}' (#{at.class})"}"
+ #
+ # adds every job waiting in the @schedule_queue to
+ # either @pending_jobs or @cron_jobs.
+ #
+ def step_schedule
- jobClass = params[:every] ? EveryJob : AtJob
+ loop do
- job_id = params[:job_id]
+ break if @schedule_queue.empty?
- b = to_block params, &block
+ j = @schedule_queue.pop
- job = jobClass.new self, at, job_id, params, &b
+ if j.is_a?(CronJob)
- #do_unschedule(job_id) if job_id
+ @cron_jobs[j.job_id] = j
- if at < (Time.new.to_f + @precision)
+ else # it's an 'at' job
- job.trigger() unless params[:discard_past]
- return nil
- end
+ push_pending_job j
+ end
+ end
+ end
- @schedule_queue << job
+ #
+ # triggers every eligible pending jobs, then every eligible
+ # cron jobs.
+ #
+ def step_trigger
- job.job_id
- end
+ now = Time.new
- #
- # Ensures that a duration is a expressed as a Float instance.
- #
- # duration_to_f("10s")
- #
- # will yields 10.0
- #
- def duration_to_f (s)
+ if @exit_when_no_more_jobs
- return s if s.kind_of?(Float)
- return Rufus::parse_time_string(s) if s.kind_of?(String)
- Float(s.to_s)
- end
+ if @pending_jobs.size < 1
- #
- # Ensures an 'at' instance is translated to a float
- # (to be compared with the float coming from time.to_f)
- #
- def at_to_f (at)
+ @stopped = true
+ return
+ end
- at = Rufus::to_ruby_time(at) if at.kind_of?(String)
- at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
- at = at.to_f if at.kind_of?(Time)
- at
- end
+ @dont_reschedule_every = true if at_job_count < 1
+ end
- #
- # Returns a block. If a block is passed, will return it, else,
- # if a :schedulable is set in the params, will return a block
- # wrapping a call to it.
- #
- def to_block (params, &block)
+ # TODO : eventually consider running cron / pending
+ # job triggering in two different threads
+ #
+ # but well... there's the synchronization issue...
- return block if block
+ #
+ # cron jobs
- schedulable = params[:schedulable]
+ if now.sec != @last_cron_second
- return nil unless schedulable
+ @last_cron_second = now.sec
- params.delete :schedulable
+ #puts "step() @cron_jobs.size #{@cron_jobs.size}"
- l = lambda do
- schedulable.trigger(params)
- end
- class << l
- attr_accessor :schedulable
- end
- l.schedulable = schedulable
+ @cron_jobs.each do |cron_id, cron_job|
+ #puts "step() cron_id : #{cron_id}"
+ #trigger(cron_job) if cron_job.matches?(now, @precision)
+ trigger(cron_job) if cron_job.matches?(now)
+ end
+ end
- l
- end
+ #
+ # pending jobs
- #
- # Pushes an 'at' job into the pending job list
- #
- def push_pending_job (job)
+ now = now.to_f
+ #
+ # that's what at jobs do understand
- old = @pending_jobs.find { |j| j.job_id == job.job_id }
- @pending_jobs.delete(old) if old
- #
- # override previous job with same id
+ loop do
- if @pending_jobs.length < 1 or job.at >= @pending_jobs.last.at
- @pending_jobs << job
- return
- end
+ break if @pending_jobs.length < 1
- for i in 0...@pending_jobs.length
- if job.at <= @pending_jobs[i].at
- @pending_jobs[i, 0] = job
- return # right place found
- end
- end
- end
+ job = @pending_jobs[0]
- #
- # This is the method called each time the scheduler wakes up
- # (by default 4 times per second). It's meant to quickly
- # determine if there are jobs to trigger else to get back to sleep.
- # 'cron' jobs get executed if necessary then 'at' jobs.
- #
- def step
+ break if job.at > now
- #puts Time.now.to_f
- #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect
-
- step_unschedule
- # unschedules any job in the unschedule queue before
- # they have a chance to get triggered.
-
- step_trigger
- # triggers eligible jobs
-
- step_schedule
- # schedule new jobs
-
- # done.
- end
-
+ #if job.at <= now
#
- # unschedules jobs in the unschedule_queue
- #
- def step_unschedule
+ # obviously
- loop do
+ trigger job
- break if @unschedule_queue.empty?
+ @pending_jobs.delete_at 0
+ end
+ end
- type, job_id = @unschedule_queue.pop
+ #
+ # Triggers the job (in a dedicated thread).
+ #
+ def trigger (job)
- if type == :cron
+ Thread.new do
+ begin
- do_unschedule_cron_job job_id
- else
-
- do_unschedule job_id
- end
- end
- end
+ job.trigger
- #
- # adds every job waiting in the @schedule_queue to
- # either @pending_jobs or @cron_jobs.
- #
- def step_schedule
+ rescue Exception => e
- loop do
+ log_exception e
+ end
+ end
+ end
- break if @schedule_queue.empty?
+ #
+ # If an error occurs in the job, it well get caught and an error
+ # message will be displayed to STDOUT.
+ # If this scheduler provides a lwarn(message) method, it will
+ # be used insted.
+ #
+ # Of course, one can override this method.
+ #
+ def log_exception (e)
- j = @schedule_queue.pop
+ message =
+ "trigger() caught exception\n" +
+ e.to_s + "\n" +
+ e.backtrace.join("\n")
- if j.is_a?(CronJob)
+ if self.respond_to?(:lwarn)
+ lwarn { message }
+ else
+ puts message
+ end
+ end
+ end
- @cron_jobs[j.job_id] = j
+ #
+ # This module adds a trigger method to any class that includes it.
+ # The default implementation feature here triggers an exception.
+ #
+ module Schedulable
- else # it's an 'at' job
+ def trigger (params)
+ raise "trigger() implementation is missing"
+ end
- push_pending_job j
- end
- end
- end
+ def reschedule (scheduler)
+ raise "reschedule() implentation is missing"
+ end
+ end
- #
- # triggers every eligible pending jobs, then every eligible
- # cron jobs.
- #
- def step_trigger
+ protected
- now = Time.new
+ JOB_ID_LOCK = Monitor.new
+ #
+ # would it be better to use a Mutex instead of a full-blown
+ # Monitor ?
- if @exit_when_no_more_jobs
+ #
+ # The parent class for scheduled jobs.
+ #
+ class Job
- if @pending_jobs.size < 1
+ @@last_given_id = 0
+ #
+ # as a scheduler is fully transient, no need to
+ # have persistent ids, a simple counter is sufficient
- @stopped = true
- return
- end
+ #
+ # The identifier for the job
+ #
+ attr_accessor :job_id
- @dont_reschedule_every = true if at_job_count < 1
- end
+ #
+ # An array of tags
+ #
+ attr_accessor :tags
- # TODO : eventually consider running cron / pending
- # job triggering in two different threads
- #
- # but well... there's the synchronization issue...
+ #
+ # The block to execute at trigger time
+ #
+ attr_accessor :block
- #
- # cron jobs
+ #
+ # A reference to the scheduler
+ #
+ attr_reader :scheduler
- if now.sec != @last_cron_second
+ #
+ # Keeping a copy of the initialization params of the job.
+ #
+ attr_reader :params
- @last_cron_second = now.sec
- #puts "step() @cron_jobs.size #{@cron_jobs.size}"
+ def initialize (scheduler, job_id, params, &block)
- @cron_jobs.each do |cron_id, cron_job|
- #puts "step() cron_id : #{cron_id}"
- #trigger(cron_job) if cron_job.matches?(now, @precision)
- trigger(cron_job) if cron_job.matches?(now)
- end
- end
+ @scheduler = scheduler
+ @block = block
- #
- # pending jobs
+ if job_id
+ @job_id = job_id
+ else
+ JOB_ID_LOCK.synchronize do
+ @job_id = @@last_given_id
+ @@last_given_id = @job_id + 1
+ end
+ end
- now = now.to_f
- #
- # that's what at jobs do understand
+ @params = params
- loop do
+ #@tags = Array(tags).collect { |tag| tag.to_s }
+ # making sure we have an array of String tags
- break if @pending_jobs.length < 1
+ @tags = Array(params[:tags])
+ # any tag is OK
+ end
- job = @pending_jobs[0]
+ #
+ # Returns true if this job sports the given tag
+ #
+ def has_tag? (tag)
- break if job.at > now
+ @tags.include?(tag)
+ end
- #if job.at <= now
- #
- # obviously
+ #
+ # Removes (cancels) this job from its scheduler.
+ #
+ def unschedule
- trigger job
+ @scheduler.unschedule(@job_id)
+ end
+ end
- @pending_jobs.delete_at 0
- end
- end
+ #
+ # An 'at' job.
+ #
+ class AtJob < Job
- #
- # Triggers the job (in a dedicated thread).
- #
- def trigger (job)
+ #
+ # The float representation (Time.to_f) of the time at which
+ # the job should be triggered.
+ #
+ attr_accessor :at
- Thread.new do
- begin
+ #
+ # The constructor.
+ #
+ def initialize (scheduler, at, at_id, params, &block)
- job.trigger
+ super(scheduler, at_id, params, &block)
+ @at = at
+ end
- rescue Exception => e
+ #
+ # Triggers the job (calls the block)
+ #
+ def trigger
- log_exception e
- end
- end
- end
+ @block.call @job_id, @at
+ end
- #
- # If an error occurs in the job, it well get caught and an error
- # message will be displayed to STDOUT.
- # If this scheduler provides a lwarn(message) method, it will
- # be used insted.
- #
- # Of course, one can override this method.
- #
- def log_exception (e)
+ #
+ # Returns the Time instance at which this job is scheduled.
+ #
+ def schedule_info
- message =
- "trigger() caught exception\n" +
- e.to_s + "\n" +
- e.backtrace.join("\n")
-
- if self.respond_to?(:lwarn)
- lwarn { message }
- else
- puts message
- end
- end
+ Time.at(@at)
+ end
end
#
- # This module adds a trigger method to any class that includes it.
- # The default implementation feature here triggers an exception.
+ # An 'every' job is simply an extension of an 'at' job.
#
- module Schedulable
+ class EveryJob < AtJob
- def trigger (params)
- raise "trigger() implementation is missing"
- end
+ #
+ # Returns the frequency string used to schedule this EveryJob,
+ # like for example "3d" or "1M10d3h".
+ #
+ def schedule_info
- def reschedule (scheduler)
- raise "reschedule() implentation is missing"
- end
+ @params[:every]
+ end
end
- protected
+ #
+ # A cron job.
+ #
+ class CronJob < Job
- JOB_ID_LOCK = Monitor.new
- #
- # would it be better to use a Mutex instead of a full-blown
- # Monitor ?
+ #
+ # The CronLine instance representing the times at which
+ # the cron job has to be triggered.
+ #
+ attr_accessor :cron_line
- #
- # The parent class for scheduled jobs.
- #
- class Job
+ def initialize (scheduler, cron_id, line, params, &block)
- @@last_given_id = 0
- #
- # as a scheduler is fully transient, no need to
- # have persistent ids, a simple counter is sufficient
+ super(scheduler, cron_id, params, &block)
- #
- # The identifier for the job
- #
- attr_accessor :job_id
+ if line.is_a?(String)
- #
- # An array of tags
- #
- attr_accessor :tags
+ @cron_line = CronLine.new(line)
- #
- # The block to execute at trigger time
- #
- attr_accessor :block
+ elsif line.is_a?(CronLine)
- #
- # A reference to the scheduler
- #
- attr_reader :scheduler
+ @cron_line = line
- #
- # Keeping a copy of the initialization params of the job.
- #
- attr_reader :params
-
+ else
- def initialize (scheduler, job_id, params, &block)
+ raise \
+ "Cannot initialize a CronJob " +
+ "with a param of class #{line.class}"
+ end
+ end
- @scheduler = scheduler
- @block = block
+ #
+ # This is the method called by the scheduler to determine if it
+ # has to fire this CronJob instance.
+ #
+ def matches? (time)
+ #def matches? (time, precision)
- if job_id
- @job_id = job_id
- else
- JOB_ID_LOCK.synchronize do
- @job_id = @@last_given_id
- @@last_given_id = @job_id + 1
- end
- end
+ #@cron_line.matches?(time, precision)
+ @cron_line.matches?(time)
+ end
- @params = params
+ #
+ # As the name implies.
+ #
+ def trigger
- #@tags = Array(tags).collect { |tag| tag.to_s }
- # making sure we have an array of String tags
+ @block.call @job_id, @cron_line
+ end
- @tags = Array(params[:tags])
- # any tag is OK
- end
+ #
+ # Returns the original cron tab string used to schedule this
+ # Job. Like for example "60/3 * * * Sun".
+ #
+ def schedule_info
- #
- # Returns true if this job sports the given tag
- #
- def has_tag? (tag)
+ @cron_line.original
+ end
+ end
- @tags.include?(tag)
- end
+ #
+ # A 'cron line' is a line in the sense of a crontab
+ # (man 5 crontab) file line.
+ #
+ class CronLine
- #
- # Removes (cancels) this job from its scheduler.
- #
- def unschedule
+ #
+ # The string used for creating this cronline instance.
+ #
+ attr_reader :original
- @scheduler.unschedule(@job_id)
- end
- end
+ attr_reader \
+ :seconds,
+ :minutes,
+ :hours,
+ :days,
+ :months,
+ :weekdays
- #
- # An 'at' job.
- #
- class AtJob < Job
+ def initialize (line)
- #
- # The float representation (Time.to_f) of the time at which
- # the job should be triggered.
- #
- attr_accessor :at
+ super()
- #
- # The constructor.
- #
- def initialize (scheduler, at, at_id, params, &block)
+ @original = line
- super(scheduler, at_id, params, &block)
- @at = at
- end
+ items = line.split
- #
- # Triggers the job (calls the block)
- #
- def trigger
-
- @block.call @job_id, @at
- end
-
- #
- # Returns the Time instance at which this job is scheduled.
- #
- def schedule_info
-
- Time.at(@at)
- end
+ unless [ 5, 6 ].include?(items.length)
+ raise \
+ "cron '#{line}' string should hold 5 or 6 items, " +
+ "not #{items.length}" \
end
- #
- # An 'every' job is simply an extension of an 'at' job.
- #
- class EveryJob < AtJob
+ offset = items.length - 5
- #
- # Returns the frequency string used to schedule this EveryJob,
- # like for example "3d" or "1M10d3h".
- #
- def schedule_info
-
- @params[:every]
- end
+ @seconds = if offset == 1
+ parse_item(items[0], 0, 59)
+ else
+ [ 0 ]
end
+ @minutes = parse_item(items[0+offset], 0, 59)
+ @hours = parse_item(items[1+offset], 0, 24)
+ @days = parse_item(items[2+offset], 1, 31)
+ @months = parse_item(items[3+offset], 1, 12)
+ @weekdays = parse_weekdays(items[4+offset])
- #
- # A cron job.
- #
- class CronJob < Job
+ #adjust_arrays()
+ end
- #
- # The CronLine instance representing the times at which
- # the cron job has to be triggered.
- #
- attr_accessor :cron_line
+ #
+ # Returns true if the given time matches this cron line.
+ #
+ # (the precision is passed as well to determine if it's
+ # worth checking seconds and minutes)
+ #
+ def matches? (time)
+ #def matches? (time, precision)
- def initialize (scheduler, cron_id, line, params, &block)
+ time = Time.at(time) \
+ if time.kind_of?(Float) or time.kind_of?(Integer)
- super(scheduler, cron_id, params, &block)
+ return false \
+ if no_match?(time.sec, @seconds)
+ #if precision <= 1 and no_match?(time.sec, @seconds)
+ return false \
+ if no_match?(time.min, @minutes)
+ #if precision <= 60 and no_match?(time.min, @minutes)
+ return false \
+ if no_match?(time.hour, @hours)
+ return false \
+ if no_match?(time.day, @days)
+ return false \
+ if no_match?(time.month, @months)
+ return false \
+ if no_match?(time.wday, @weekdays)
- if line.is_a?(String)
+ true
+ end
- @cron_line = CronLine.new(line)
+ #
+ # Returns an array of 6 arrays (seconds, minutes, hours, days,
+ # months, weekdays).
+ # This method is used by the cronline unit tests.
+ #
+ def to_array
+ [ @seconds, @minutes, @hours, @days, @months, @weekdays ]
+ end
- elsif line.is_a?(CronLine)
+ private
- @cron_line = line
+ #--
+ # adjust values to Ruby
+ #
+ #def adjust_arrays()
+ # @hours = @hours.collect { |h|
+ # if h == 24
+ # 0
+ # else
+ # h
+ # end
+ # } if @hours
+ # @weekdays = @weekdays.collect { |wd|
+ # wd - 1
+ # } if @weekdays
+ #end
+ #
+ # dead code, keeping it as a reminder
+ #++
- else
+ WDS = [ "sun", "mon", "tue", "wed", "thu", "fri", "sat" ]
+ #
+ # used by parse_weekday()
- raise \
- "Cannot initialize a CronJob " +
- "with a param of class #{line.class}"
- end
- end
+ def parse_weekdays (item)
- #
- # This is the method called by the scheduler to determine if it
- # has to fire this CronJob instance.
- #
- def matches? (time)
- #def matches? (time, precision)
+ item = item.downcase
- #@cron_line.matches?(time, precision)
- @cron_line.matches?(time)
- end
+ WDS.each_with_index do |day, index|
+ item = item.gsub day, "#{index}"
+ end
- #
- # As the name implies.
- #
- def trigger
+ r = parse_item item, 0, 7
- @block.call @job_id, @cron_line
- end
+ return r unless r.is_a?(Array)
- #
- # Returns the original cron tab string used to schedule this
- # Job. Like for example "60/3 * * * Sun".
- #
- def schedule_info
-
- @cron_line.original
- end
+ r.collect { |e| e == 7 ? 0 : e }.uniq
end
- #
- # A 'cron line' is a line in the sense of a crontab
- # (man 5 crontab) file line.
- #
- class CronLine
+ def parse_item (item, min, max)
- #
- # The string used for creating this cronline instance.
- #
- attr_reader :original
+ return nil \
+ if item == "*"
+ return parse_list(item, min, max) \
+ if item.index(",")
+ return parse_range(item, min, max) \
+ if item.index("*") or item.index("-")
- attr_reader \
- :seconds,
- :minutes,
- :hours,
- :days,
- :months,
- :weekdays
+ i = Integer(item)
- def initialize (line)
+ i = min if i < min
+ i = max if i > max
- super()
+ [ i ]
+ end
- @original = line
+ def parse_list (item, min, max)
- items = line.split
+ items = item.split(",")
- unless [ 5, 6 ].include?(items.length)
- raise \
- "cron '#{line}' string should hold 5 or 6 items, " +
- "not #{items.length}" \
- end
+ items.inject([]) { |r, i| r.push(parse_range(i, min, max)) }.flatten
+ end
- offset = items.length - 5
+ def parse_range (item, min, max)
- @seconds = if offset == 1
- parse_item(items[0], 0, 59)
- else
- [ 0 ]
- end
- @minutes = parse_item(items[0+offset], 0, 59)
- @hours = parse_item(items[1+offset], 0, 24)
- @days = parse_item(items[2+offset], 1, 31)
- @months = parse_item(items[3+offset], 1, 12)
- @weekdays = parse_weekdays(items[4+offset])
+ i = item.index("-")
+ j = item.index("/")
- #adjust_arrays()
- end
+ return item.to_i if (not i and not j)
- #
- # Returns true if the given time matches this cron line.
- #
- # (the precision is passed as well to determine if it's
- # worth checking seconds and minutes)
- #
- def matches? (time)
- #def matches? (time, precision)
+ inc = 1
- time = Time.at(time) \
- if time.kind_of?(Float) or time.kind_of?(Integer)
+ inc = Integer(item[j+1..-1]) if j
- return false \
- if no_match?(time.sec, @seconds)
- #if precision <= 1 and no_match?(time.sec, @seconds)
- return false \
- if no_match?(time.min, @minutes)
- #if precision <= 60 and no_match?(time.min, @minutes)
- return false \
- if no_match?(time.hour, @hours)
- return false \
- if no_match?(time.day, @days)
- return false \
- if no_match?(time.month, @months)
- return false \
- if no_match?(time.wday, @weekdays)
+ istart = -1
+ iend = -1
- true
- end
+ if i
- #
- # Returns an array of 6 arrays (seconds, minutes, hours, days,
- # months, weekdays).
- # This method is used by the cronline unit tests.
- #
- def to_array
- [ @seconds, @minutes, @hours, @days, @months, @weekdays ]
+ istart = Integer(item[0..i-1])
+
+ if j
+ iend = Integer(item[i+1..j])
+ else
+ iend = Integer(item[i+1..-1])
end
- private
+ else # case */x
- #--
- # adjust values to Ruby
- #
- #def adjust_arrays()
- # @hours = @hours.collect { |h|
- # if h == 24
- # 0
- # else
- # h
- # end
- # } if @hours
- # @weekdays = @weekdays.collect { |wd|
- # wd - 1
- # } if @weekdays
- #end
- #
- # dead code, keeping it as a reminder
- #++
+ istart = min
+ iend = max
+ end
- WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ]
- #
- # used by parse_weekday()
+ istart = min if istart < min
+ iend = max if iend > max
- def parse_weekdays (item)
+ result = []
- item = item.downcase
+ value = istart
+ loop do
- WDS.each_with_index do |day, index|
- item = item.gsub(day, "#{index+1}")
- end
+ result << value
+ value = value + inc
+ break if value > iend
+ end
- parse_item(item, 1, 7)
- end
+ result
+ end
- def parse_item (item, min, max)
+ def no_match? (value, cron_values)
- return nil \
- if item == "*"
- return parse_list(item, min, max) \
- if item.index(",")
- return parse_range(item, min, max) \
- if item.index("*") or item.index("-")
+ return false if not cron_values
- i = Integer(item)
+ cron_values.each do |v|
+ return false if value == v # ok, it matches
+ end
- i = min if i < min
- i = max if i > max
-
- [ i ]
- end
-
- def parse_list (item, min, max)
-
- items = item.split(",")
-
- items.inject([]) do |result, i|
-
- i = Integer(i)
-
- i = min if i < min
- i = max if i > max
-
- result.push i
- end
- end
-
- def parse_range (item, min, max)
-
- i = item.index("-")
- j = item.index("/")
-
- inc = 1
-
- inc = Integer(item[j+1..-1]) if j
-
- istart = -1
- iend = -1
-
- if i
-
- istart = Integer(item[0..i-1])
-
- if j
- iend = Integer(item[i+1..j])
- else
- iend = Integer(item[i+1..-1])
- end
-
- else # case */x
-
- istart = min
- iend = max
- end
-
- istart = min if istart < min
- iend = max if iend > max
-
- result = []
-
- value = istart
- loop do
-
- result << value
- value = value + inc
- break if value > iend
- end
-
- result
- end
-
- def no_match? (value, cron_values)
-
- return false if not cron_values
-
- cron_values.each do |v|
- return false if value == v # ok, it matches
- end
-
- true # no match found
- end
+ true # no match found
end
+ end
end