# #-- # Copyright (c) 2006-2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'monitor' #require 'openwfe/utils' require 'openwfe/util/otime' require 'openwfe/util/stoppable' module OpenWFE # # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs. # 'at' jobs to execute once at a given point in time. 'cron' jobs # execute a specified intervals. # The two main methods are thus schedule_at() and schedule(). # # schedule_at() and schedule() await either a Schedulable instance and # params (usually an array or nil), either a block, which is more in the # Ruby way. # # Two examples : # # scheduler.schedule_in("3d") do # regenerate_monthly_report() # end # # # # will call the regenerate_monthly_report method # # in 3 days from now # # and # # class Regenerator < Schedulable # def trigger (frequency) # self.send(frequency) # end # def monthly # # ... # end # def yearly # # ... # end # end # # regenerator = Regenerator.new # # scheduler.schedule_in("4d", r, :monthly) # # # # will regenerate the monthly report in four days # class Scheduler include MonitorMixin, Stoppable attr_accessor \ :precision def initialize super() @pending_jobs = [] @cron_entries = {} @scheduler_thread = nil @precision = 0.250 # # every 250ms, the scheduler wakes up @last_cron_minute = -1 end # # Starts this scheduler (or restart it if it was previously stopped) # def start #if @scheduler_thread # @scheduler_thread.wakeup # return #end @scheduler_thread = Thread.new do while true break if self.is_stopped? #print "." #$stdout.flush step sleep(@precision) end end do_restart end # # The scheduler is stoppable via stop() or do_stop() # alias :stop :do_stop # # Joins on the scheduler thread # def join @scheduler_thread.join end # # Schedules a job by specifying at which time it should trigger. # Returns the a job_id that can be used to unschedule the job. # def schedule_at (at, schedulable=nil, params=nil, &block) synchronize do #puts "0 at is '#{at.to_s}' (#{at.class})" at = OpenWFE::to_ruby_time(at) \ if at.kind_of? String at = OpenWFE::to_gm_time(at) \ if at.kind_of? DateTime at = at.to_f \ if at.kind_of? Time #puts "1 at is '#{at.to_s}' (#{at.class})"}" b = to_block(schedulable, params, &block) job = AtEntry.new(at, &b) if at < (Time.new.to_f + @precision) job.trigger() return nil end return push(job) \ if @pending_jobs.length < 1 # shortcut : check if the new job is posterior to # the last job pending return push(job) \ if at >= @pending_jobs.last.at for i in 0...@pending_jobs.length if at <= @pending_jobs[i].at return push(job, i) end end return push(job) end end # # Schedules a job by stating in how much time it should trigger. # Returns the a job_id that can be used to unschedule the job. # def schedule_in (duration, schedulable=nil, params=nil, &block) if duration.kind_of?(String) duration = OpenWFE::parse_time_string(duration) elsif not duration.kind_of?(Float) duration = Float(duration.to_s) end return schedule_at( Time.new.to_f + duration, schedulable, params, &block) end # # Unschedules an 'at' or a 'cron' job identified by the id # it was given at schedule time. # def unschedule (job_id) synchronize do for i in 0...@pending_jobs.length if @pending_jobs[i].eid == job_id @pending_jobs.delete_at(i) return true end end return true if unschedule_cron_job(job_id) return false end end # # Unschedules a cron job # def unschedule_cron_job (job_id) synchronize do if @cron_entries.has_key?(job_id) @cron_entries.delete(job_id) return true end return false end end # # Schedules a cron job, the 'cron_line' is a string # following the Unix cron standard (see "man 5 crontab" in your command # line). # # For example : # # scheduler.schedule("5 0 * * *", nil, s, p) # # will trigger the schedulable s with params p every day # # five minutes after midnight # # scheduler.schedule("15 14 1 * *", nil, s, p) # # will trigger s at 14:15 on the first of every month # # scheduler.schedule("0 22 * * 1-5") do # puts "it's break time..." # end # # outputs a message every weekday at 10pm # # Returns the job id attributed to this 'cron job', this id can # be used to unschedule the job. # def schedule \ (cron_line, cron_id=nil, schedulable=nil, params=nil, &block) synchronize do # # is a job with the same id already scheduled ? if cron_id if unschedule(cron_id) ldebug do "schedule() unscheduled previous job "+ "under same name '#{cron_id}'" end end end # # schedule b = to_block(schedulable, params, &block) entry = CronEntry.new(cron_id, cron_line, &b) @cron_entries[entry.eid] = entry return entry.eid end end # # Returns the job corresponding to job_id, an instance of AtEntry # or CronEntry will be returned. # def get_job (job_id) entry = @cron_entries[job_id] return c if c @pending_jobs.each do |entry| return entry if entry.eid == job_id end return nil end # # Returns the number of currently pending jobs in this scheduler. # def pending_job_count @pending_jobs.size end # # Returns the number of cron jobs currently active in this scheduler. # def cron_job_count @cron_entries.size end protected def to_block (schedulable, params, &block) if schedulable lambda do schedulable.trigger(params) end else block end end # # Pushes an 'at' job into the pending job list # def push (job, index=-1) if index == -1 # # push job at the end # @pending_jobs << job else # # insert job at given index # @pending_jobs[index, 0] = job end #puts "push() at '#{Time.at(job.at)}'" return job.eid end # # This is the method called each time the scheduler wakes up # (by default 4 times per second). It's meant to quickly # determine if there are jobs to trigger else to get back to sleep. # 'cron' jobs get executed if necessary then 'at' jobs. # def step synchronize do now = Time.new minute = now.min # # cron entries begin if now.sec == 0 and minute > @last_cron_minute # # only consider cron entries at the second 0 of a # minute @last_cron_minute = minute @cron_entries.each do |cron_id, cron_entry| #puts "step() cron_id : #{cron_id}" cron_entry.trigger \ if cron_entry.matches? now end end rescue Exception => e #puts \ # "step() caught exception\n" + # OpenWFE::exception_to_s(e) end # # pending jobs now = now.to_f # # that's what at jobs do understand while true #puts "step() job.count is #{@pending_jobs.length}" break if @pending_jobs.length < 1 job = @pending_jobs[0] #puts "step() job.at is #{job.at}" #puts "step() now is #{now}" break if job.at > now #if job.at <= now # # obviously job.trigger() @pending_jobs.delete_at(0) end end end end # # This module adds a trigger method to any class that includes it. # The default implementation feature here triggers an exception. # module Schedulable def trigger (params) raise "trigger() implementation is missing" end def reschedule (scheduler) raise "reschedule() implentation is missing" end end protected JOB_ID_LOCK = Monitor.new class Entry @@last_given_id = 0 # # as a scheduler is fully transient, no need to # have persistent ids, a simple counter is sufficient attr_accessor \ :eid, :block def initialize (entry_id=nil, &block) @block = block if entry_id @eid = entry_id else JOB_ID_LOCK.synchronize do @eid = @@last_given_id @@last_given_id = @eid + 1 end end end #def trigger # @block.call @eid #end end class AtEntry < Entry attr_accessor \ :at def initialize (at, &block) super(&block) @at = at end def trigger @block.call @eid, @at end end class CronEntry < Entry attr_accessor \ :cron_line def initialize (cron_id, line, &block) super(cron_id, &block) if line.kind_of? String @cron_line = CronLine.new(line) elsif line.kind_of? CronLine @cron_line = line else raise \ "Cannot initialize a CronEntry " + "with a param of class #{line.class}" end end def matches? (time) @cron_line.matches? time end def trigger @block.call @eid, @cron_line end end # # A 'cron line' is a line in the sense of a crontab # (man 5 crontab) file line. # class CronLine attr_reader \ :minutes, :hours, :days, :months, :weekdays def initialize (line) super() items = line.split if items.length != 5 raise \ "cron '#{line}' string should hold 5 items, " + "not #{items.length}" \ end @minutes = parse_item(items[0], 0, 59) @hours = parse_item(items[1], 0, 24) @days = parse_item(items[2], 1, 31) @months = parse_item(items[3], 1, 12) @weekdays = parse_weekdays(items[4]) adjust_arrays() end def matches? (time) if time.kind_of?(Float) or time.kind_of?(Integer) time = Time.at(time) end return false if no_match?(time.min, @minutes) return false if no_match?(time.hour, @hours) return false if no_match?(time.day, @days) return false if no_match?(time.month, @months) return false if no_match?(time.wday, @weekdays) return true end # # Returns an array of 5 arrays (minutes, hours, days, months, # weekdays). # This method is used by the cronline unit tests. # def to_array [ @minutes, @hours, @days, @months, @weekdays ] end private # # adjust values to Ruby # def adjust_arrays() if @hours @hours.each do |h| h = 0 if h == 23 end end if @weekdays @weekdays.each do |wd| wd = wd - 1 end end end WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ] # # used by parse_weekday() def parse_weekdays (item) item = item.downcase WDS.each_with_index do |day, index| item = item.gsub(day, "#{index+1}") end return parse_item(item, 1, 7) end def parse_item (item, min, max) return nil \ if item == "*" return parse_list(item, min, max) \ if item.index(",") return parse_range(item, min, max) \ if item.index("*") or item.index("-") i = Integer(item) i = min if i < min i = max if i > max return [ i ] end def parse_list (item, min, max) items = item.split(",") result = [] items.each do |i| i = Integer(i) i = min if i < min i = max if i > max result << i end return result end def parse_range (item, min, max) i = item.index("-") j = item.index("/") inc = 1 inc = Integer(item[j+1..-1]) if j istart = -1 iend = -1 if i istart = Integer(item[0..i-1]) if j iend = Integer(item[i+1..j]) else iend = Integer(item[i+1..-1]) end else # case */x istart = min iend = max end istart = min if istart < min iend = max if iend > max result = [] value = istart while true result << value value = value + inc break if value > iend end return result end def no_match? (value, cron_values) return false if not cron_values cron_values.each do |v| return false if value == v end return true end end end