# # # Copyright (c) 2006, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'monitor' require 'otime' require 'ru/ruutils' module OpenWFEru class Scheduler include MonitorMixin attr_accessor \ :precision def initialize super() @pending_jobs = [] @cron_entries = {} @scheduler_thread = nil @precision = 0.250 # # every 250ms, the scheduler wakes up @last_cron_minute = -1 end def stop @scheduler_thread.stop \ if @scheduler_thread and not @scheduler_thread.stop? end def start if @scheduler_thread @scheduler_thread.wakeup return end @scheduler_thread = Thread.new do while true step sleep(@precision) end end end def step synchronize do now = Time.new minute = now.to_i / 60 #puts "step() minute is #{minute}" #puts "step() last_cron_minute is #{@last_cron_minute}" # # cron entries begin if minute > @last_cron_minute @last_cron_minute = minute @cron_entries.each do |cron_id, cron_entry| #puts "step() cron_id : #{cron_id}" cron_entry.trigger \ if cron_entry.matches? now end end rescue Exception => e #puts \ # "step() caught exception\n" + # OpenWFEru::exception_to_s(e) end # # pending jobs now = now.to_f # # that's what at jobs do understand while true #puts "step() job.count is #{@pending_jobs.length}" break if @pending_jobs.length < 1 job = @pending_jobs[0] #puts "step() job.at is #{job.at}" #puts "step() now is #{now}" break if job.at > now #if job.at <= now # # obviously job.trigger() @pending_jobs.delete_at(0) end end end # # joins on the scheduler thread # def join @scheduler_thread.join end # # schedules a job by specifying at which time it should trigger # def schedule_at (at, schedulable, params) synchronize do at = OpenWFE::to_ruby_time(at) \ if at.kind_of?(String) at = at.to_f \ if at.kind_of?(Time) job = JobEntry.new(at, schedulable, params) if at < (Time.new.to_f + @precision) job.trigger() return nil end return push(job) \ if @pending_jobs.length < 1 # shortcut : check if the new job is posterior to # the last job pending return push(job) \ if at >= @pending_jobs.last.at for i in 0...@pending_jobs.length if at <= @pending_jobs[i].at return push(job, i) end end return push(job) end end # # schedules a job by stating in how much time it should trigger # def schedule_in (duration, schedulable, params) if duration.kind_of?(String) duration = OpenWFE::parse_time_string(duration) elsif not duration.kind_of?(Float) duration = Float(duration.to_s) end return schedule_at(Time.new.to_f + duration, schedulable, params) end # # unschedules 'at' or 'cron' job # def unschedule (entry_id) synchronize do for i in 0...@pending_jobs.length if @pending_jobs[i].id == at_id @pending_jobs.delete_at(i) return true end end if @cron_entries.has_key?(entry_id) @cron_entries.delete(entry_id) return true end return false end end # # schedules a cron job # def schedule (cron_line, schedulable, params) synchronize do entry = CronEntry.new(cron_line, schedulable, params) @cron_entries[entry.eid] = entry return entry.eid end end protected def push (job, index=-1) if index == -1 # # push job at the end # @pending_jobs << job else # # insert job at given index # @pending_jobs[index, 0] = job end #puts "push() at '#{Time.at(job.at)}'" return job.eid end end # # This module adds a trigger method to any class that includes it. # The default implementation feature here triggers an exception. # module Schedulable def trigger (params) raise "trigger() implementation is missing" end end # # a 'cron line' is a line in the sense of a crontab (man 5 cron) file # class CronLine def initialize (line) super() items = line.split if items.length != 5 raise \ "cron '#{line}' string should hold 5 items, " + "not #{items.length}" \ end @minutes = parse_item(items[0], 0, 59) @hours = parse_item(items[1], 0, 24) @days = parse_item(items[2], 1, 31) @months = parse_item(items[3], 1, 12) @weekdays = parse_item(items[4], 1, 7) adjust_arrays() end def matches? (time) if time.kind_of?(Float) or time.kind_of?(Integer) time = Time.at(time) end return false if no_match?(time.min, @minutes) return false if no_match?(time.hour, @hours) return false if no_match?(time.day, @days) return false if no_match?(time.month, @months) return false if no_match?(time.wday, @weekdays) return true end private # # adjust values to Ruby # def adjust_arrays() if @hours @hours.each do |h| h = 0 if h == 23 end end if @weekdays @weekdays.each do |wd| wd = wd - 1 end end end def parse_item (item, min, max) return nil \ if item == "*" return parse_list(item, min, max) \ if item.index(",") > -1 return parse_range(item, min, max) \ if item.index("*") > -1 or item.index("-") > -1 i = Integer(item) i = min if i < min i = max if i > max return [ i ] end def parse_list (item, min, max) items = item.split(",") result = [] items.each do |i| i = Integer(i) i = min if i < min i = max if i > max result << i end return result end def parse_range (item, min, max) i = item.index("-") j = item.index("/") inc = 1 inc = Integer(item[j+1..-1]) if j > -1 istart = -1 iend = -1 if i > -1 istart = Integer(item[0..i]) if j > -1 iend = Integer(item[i+1..j]) else iend = Integer(i+1..-1) end else # case */x istart = min iend = max end istart = min if istart < min iend = max if iend > max result = [] value = istart while true result << value value = value + inc break if value > iend end return result end def no_match? (value, cron_values) return false if not cron_values cron_values.each do |v| return false if value == v end return true end end protected JOB_ID_LOCK = Monitor.new class Entry @@last_given_id = 0 # # as a scheduler is fully transient, no need to # have persistent ids, a simple counter is sufficient attr_accessor \ :eid, :schedulable, :params def initialize (schedulable, params) @schedulable = schedulable @params = params JOB_ID_LOCK.synchronize do @eid = @@last_given_id + 1 @@last_given_id = @eid end end def trigger @schedulable.trigger(params) end end class JobEntry < Entry attr_accessor \ :at def initialize (at, schedulable, params) super(schedulable, params) @at = at end end class CronEntry < Entry attr_accessor \ :cron_line def initialize (line, schedulable, params) super(schedulable, params) if line.kind_of?(String) @cronline = CronLine.new(line) elsif line.kind_of?(CronLine) @cronline = line else raise \ "Cannot initialize a CronEntry " + "with a param of class #{line.class}" end @cron_line = CronLine.new(line) end def matches? (time) @cron_line.matches?(time) end end end