lib/openwfe/util/scheduler.rb in openwferu-0.9.2 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.3

- old
+ new

@@ -39,18 +39,58 @@ # John Mettraux at openwfe.org # require 'monitor' -require 'openwfe/otime' -require 'openwfe/utils' +#require 'openwfe/utils' +require 'openwfe/util/otime' +require 'openwfe/util/stoppable' module OpenWFE + # + # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs. + # 'at' jobs to execute once at a given point in time. 'cron' jobs + # execute a specified intervals. + # The two main methods are thus schedule_at() and schedule(). + # + # schedule_at() and schedule() await either a Schedulable instance and + # params (usually an array or nil), either a block, which is more in the + # Ruby way. + # + # Two examples : + # + # scheduler.schedule_in("3d") do + # regenerate_monthly_report() + # end + # # + # # will call the regenerate_monthly_report method + # # in 3 days from now + # + # and + # + # class Regenerator < Schedulable + # def trigger (frequency) + # self.send(frequency) + # end + # def monthly + # # ... + # end + # def yearly + # # ... + # end + # end + # + # regenerator = Regenerator.new + # + # scheduler.schedule_in("4d", r, :monthly) + # # + # # will regenerate the monthly report in four days + # class Scheduler - include MonitorMixin + include MonitorMixin, Stoppable attr_accessor \ :precision def initialize @@ -67,96 +107,50 @@ # every 250ms, the scheduler wakes up @last_cron_minute = -1 end - def stop - @scheduler_thread.stop \ - if @scheduler_thread and not @scheduler_thread.stop? - end - + # + # Starts this scheduler (or restart it if it was previously stopped) + # def start - if @scheduler_thread - @scheduler_thread.wakeup - return - end + #if @scheduler_thread + # @scheduler_thread.wakeup + # return + #end + @scheduler_thread = Thread.new do while true + break if self.is_stopped? + #print "." + #$stdout.flush step sleep(@precision) end end - end - def step - synchronize do - now = Time.new - minute = now.to_i / 60 - - #puts "step() minute is #{minute}" - #puts "step() last_cron_minute is #{@last_cron_minute}" - - # - # cron entries - - begin - if minute > @last_cron_minute - @last_cron_minute = minute - @cron_entries.each do |cron_id, cron_entry| - #puts "step() cron_id : #{cron_id}" - cron_entry.trigger \ - if cron_entry.matches? now - end - end - rescue Exception => e - #puts \ - # "step() caught exception\n" + - # OpenWFE::exception_to_s(e) - end - - # - # pending jobs - - now = now.to_f - # - # that's what at jobs do understand - - while true - - #puts "step() job.count is #{@pending_jobs.length}" - - break if @pending_jobs.length < 1 - - job = @pending_jobs[0] - - #puts "step() job.at is #{job.at}" - #puts "step() now is #{now}" - - break if job.at > now - - #if job.at <= now - # - # obviously - - job.trigger() - @pending_jobs.delete_at(0) - end - end + do_restart end # - # joins on the scheduler thread + # The scheduler is stoppable via stop() or do_stop() # + alias :stop :do_stop + + # + # Joins on the scheduler thread + # def join @scheduler_thread.join end # - # schedules a job by specifying at which time it should trigger + # Schedules a job by specifying at which time it should trigger. + # Returns the a job_id that can be used to unschedule the job. # - def schedule_at (at, schedulable, params) + def schedule_at (at, schedulable=nil, params=nil, &block) synchronize do #puts "0 at is '#{at.to_s}' (#{at.class})" at = OpenWFE::to_ruby_time(at) \ @@ -166,13 +160,14 @@ if at.kind_of? DateTime at = at.to_f \ if at.kind_of? Time - #puts "1 at is '#{at.to_s}' (#{at.class})" + #puts "1 at is '#{at.to_s}' (#{at.class})"}" - job = JobEntry.new(at, schedulable, params) + b = to_block(schedulable, params, &block) + job = AtEntry.new(at, &b) if at < (Time.new.to_f + @precision) job.trigger() return nil end @@ -191,63 +186,143 @@ return push(job, i) end end return push(job) - end end # - # schedules a job by stating in how much time it should trigger + # Schedules a job by stating in how much time it should trigger. + # Returns the a job_id that can be used to unschedule the job. # - def schedule_in (duration, schedulable, params) + def schedule_in (duration, schedulable=nil, params=nil, &block) if duration.kind_of?(String) duration = OpenWFE::parse_time_string(duration) elsif not duration.kind_of?(Float) duration = Float(duration.to_s) end - return schedule_at(Time.new.to_f + duration, schedulable, params) + return schedule_at( + Time.new.to_f + duration, schedulable, params, &block) end # - # unschedules 'at' or 'cron' job + # Unschedules an 'at' or a 'cron' job identified by the id + # it was given at schedule time. # - def unschedule (entry_id) + def unschedule (job_id) synchronize do for i in 0...@pending_jobs.length - if @pending_jobs[i].eid == entry_id + if @pending_jobs[i].eid == job_id @pending_jobs.delete_at(i) return true end end - if @cron_entries.has_key?(entry_id) - @cron_entries.delete(entry_id) + return true if unschedule_cron_job(job_id) + + return false + end + end + + # + # Unschedules a cron job + # + def unschedule_cron_job (job_id) + synchronize do + if @cron_entries.has_key?(job_id) + @cron_entries.delete(job_id) return true end - return false end end # - # schedules a cron job + # Schedules a cron job, the 'cron_line' is a string + # following the Unix cron standard (see "man 5 crontab" in your command + # line). # - def schedule (cron_line, schedulable, params) + # For example : + # + # scheduler.schedule("5 0 * * *", nil, s, p) + # # will trigger the schedulable s with params p every day + # # five minutes after midnight + # + # scheduler.schedule("15 14 1 * *", nil, s, p) + # # will trigger s at 14:15 on the first of every month + # + # scheduler.schedule("0 22 * * 1-5") do + # puts "it's break time..." + # end + # # outputs a message every weekday at 10pm + # + # Returns the job id attributed to this 'cron job', this id can + # be used to unschedule the job. + # + def schedule \ + (cron_line, cron_id=nil, schedulable=nil, params=nil, &block) + synchronize do - entry = CronEntry.new(cron_line, schedulable, params) + + # + # is a job with the same id already scheduled ? + + if cron_id + if unschedule(cron_id) + ldebug do + "schedule() unscheduled previous job "+ + "under same name '#{cron_id}'" + end + end + end + + # + # schedule + + b = to_block(schedulable, params, &block) + entry = CronEntry.new(cron_id, cron_line, &b) @cron_entries[entry.eid] = entry + return entry.eid end end + # + # Returns the job corresponding to job_id, an instance of AtEntry + # or CronEntry will be returned. + # + def get_job (job_id) + + entry = @cron_entries[job_id] + return c if c + + @pending_jobs.each do |entry| + return entry if entry.eid == job_id + end + + return nil + end + protected + def to_block (schedulable, params, &block) + if schedulable + lambda do + schedulable.trigger(params) + end + else + block + end + end + + # + # Pushes an 'at' job into the pending job list + # def push (job, index=-1) if index == -1 # # push job at the end @@ -262,10 +337,74 @@ #puts "push() at '#{Time.at(job.at)}'" return job.eid end + + # + # This is the method called each time the scheduler wakes up + # (by default 4 times per second). It's meant to quickly + # determine if there are jobs to trigger else to get back to sleep. + # 'cron' jobs get executed if necessary then 'at' jobs. + # + def step + synchronize do + now = Time.new + minute = now.min + + # + # cron entries + + begin + if now.sec == 0 and minute > @last_cron_minute + # + # only consider cron entries at the second 0 of a + # minute + + @last_cron_minute = minute + + @cron_entries.each do |cron_id, cron_entry| + #puts "step() cron_id : #{cron_id}" + cron_entry.trigger \ + if cron_entry.matches? now + end + end + rescue Exception => e + #puts \ + # "step() caught exception\n" + + # OpenWFE::exception_to_s(e) + end + + # + # pending jobs + + now = now.to_f + # + # that's what at jobs do understand + + while true + + #puts "step() job.count is #{@pending_jobs.length}" + + break if @pending_jobs.length < 1 + + job = @pending_jobs[0] + + #puts "step() job.at is #{job.at}" + #puts "step() now is #{now}" + + break if job.at > now + + #if job.at <= now + # + # obviously + + job.trigger() + @pending_jobs.delete_at(0) + end + end + end end # # This module adds a trigger method to any class that includes it. # The default implementation feature here triggers an exception. @@ -273,216 +412,261 @@ module Schedulable def trigger (params) raise "trigger() implementation is missing" end + + def reschedule (scheduler) + raise "reschedule() implentation is missing" + end end - # - # a 'cron line' is a line in the sense of a crontab (man 5 cron) file - # - class CronLine + protected - def initialize (line) + JOB_ID_LOCK = Monitor.new - super() + class Entry - items = line.split + @@last_given_id = 0 + # + # as a scheduler is fully transient, no need to + # have persistent ids, a simple counter is sufficient - if items.length != 5 - raise \ - "cron '#{line}' string should hold 5 items, " + - "not #{items.length}" \ + attr_accessor \ + :eid, :block + + def initialize (entry_id=nil, &block) + @block = block + if entry_id + @eid = entry_id + else + JOB_ID_LOCK.synchronize do + @eid = @@last_given_id + @@last_given_id = @eid + 1 + end + end end - @minutes = parse_item(items[0], 0, 59) - @hours = parse_item(items[1], 0, 24) - @days = parse_item(items[2], 1, 31) - @months = parse_item(items[3], 1, 12) - @weekdays = parse_item(items[4], 1, 7) - - adjust_arrays() + #def trigger + # @block.call @eid + #end end - def matches? (time) + class AtEntry < Entry - if time.kind_of?(Float) or time.kind_of?(Integer) - time = Time.at(time) + attr_accessor \ + :at + + def initialize (at, &block) + super(&block) + @at = at end - return false if no_match?(time.min, @minutes) - return false if no_match?(time.hour, @hours) - return false if no_match?(time.day, @days) - return false if no_match?(time.month, @months) - return false if no_match?(time.wday, @weekdays) - - return true + def trigger + @block.call @eid, @at + end end - private + class CronEntry < Entry - # - # adjust values to Ruby - # - def adjust_arrays() - if @hours - @hours.each do |h| - h = 0 if h == 23 - end + attr_accessor \ + :cron_line + + def initialize (cron_id, line, &block) + + super(cron_id, &block) + + if line.kind_of? String + @cron_line = CronLine.new(line) + elsif line.kind_of? CronLine + @cron_line = line + else + raise \ + "Cannot initialize a CronEntry " + + "with a param of class #{line.class}" end - if @weekdays - @weekdays.each do |wd| - wd = wd - 1 - end - end end - def parse_item (item, min, max) + def matches? (time) + @cron_line.matches? time + end - return nil \ - if item == "*" - return parse_list(item, min, max) \ - if item.index(",") > -1 - return parse_range(item, min, max) \ - if item.index("*") > -1 or item.index("-") > -1 + def trigger + @block.call @eid, @cron_line + end + end - i = Integer(item) + # + # A 'cron line' is a line in the sense of a crontab + # (man 5 crontab) file line. + # + class CronLine - i = min if i < min - i = max if i > max + attr_reader \ + :minutes, + :hours, + :days, + :months, + :weekdays - return [ i ] - end + def initialize (line) - def parse_list (item, min, max) - items = item.split(",") - result = [] - items.each do |i| - i = Integer(i) - i = min if i < min - i = max if i > max - result << i + super() + + items = line.split + + if items.length != 5 + raise \ + "cron '#{line}' string should hold 5 items, " + + "not #{items.length}" \ end - return result + + @minutes = parse_item(items[0], 0, 59) + @hours = parse_item(items[1], 0, 24) + @days = parse_item(items[2], 1, 31) + @months = parse_item(items[3], 1, 12) + @weekdays = parse_weekdays(items[4]) + + adjust_arrays() end - def parse_range (item, min, max) - i = item.index("-") - j = item.index("/") + def matches? (time) - inc = 1 + if time.kind_of?(Float) or time.kind_of?(Integer) + time = Time.at(time) + end - inc = Integer(item[j+1..-1]) if j > -1 + return false if no_match?(time.min, @minutes) + return false if no_match?(time.hour, @hours) + return false if no_match?(time.day, @days) + return false if no_match?(time.month, @months) + return false if no_match?(time.wday, @weekdays) - istart = -1 - iend = -1 + return true + end - if i > -1 + # + # Returns an array of 5 arrays (minutes, hours, days, months, + # weekdays). + # This method is used by the cronline unit tests. + # + def to_array + [ @minutes, @hours, @days, @months, @weekdays ] + end - istart = Integer(item[0..i]) + private - if j > -1 - iend = Integer(item[i+1..j]) - else - iend = Integer(i+1..-1) + # + # adjust values to Ruby + # + def adjust_arrays() + if @hours + @hours.each do |h| + h = 0 if h == 23 + end end - - else # case */x - istart = min - iend = max + if @weekdays + @weekdays.each do |wd| + wd = wd - 1 + end + end end - istart = min if istart < min - iend = max if iend > max + WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ] + # + # used by parse_weekday() - result = [] + def parse_weekdays (item) - value = istart - while true - result << value - value = value + inc - break if value > iend - end + item = item.downcase - return result - end + WDS.each_with_index do |day, index| + item = item.gsub(day, "#{index+1}") + end - def no_match? (value, cron_values) + return parse_item(item, 1, 7) + end - return false if not cron_values + def parse_item (item, min, max) - cron_values.each do |v| - return false if value == v - end + return nil \ + if item == "*" + return parse_list(item, min, max) \ + if item.index(",") + return parse_range(item, min, max) \ + if item.index("*") or item.index("-") - return true - end - end + i = Integer(item) - protected + i = min if i < min + i = max if i > max - JOB_ID_LOCK = Monitor.new + return [ i ] + end - class Entry + def parse_list (item, min, max) + items = item.split(",") + result = [] + items.each do |i| + i = Integer(i) + i = min if i < min + i = max if i > max + result << i + end + return result + end - @@last_given_id = 0 - # - # as a scheduler is fully transient, no need to - # have persistent ids, a simple counter is sufficient + def parse_range (item, min, max) + i = item.index("-") + j = item.index("/") - attr_accessor \ - :eid, :schedulable, :params + inc = 1 - def initialize (schedulable, params) - @schedulable = schedulable - @params = params - JOB_ID_LOCK.synchronize do - @eid = @@last_given_id + 1 - @@last_given_id = @eid - end - end + inc = Integer(item[j+1..-1]) if j - def trigger - @schedulable.trigger(params) - end - end + istart = -1 + iend = -1 - class JobEntry < Entry + if i - attr_accessor \ - :at + istart = Integer(item[0..i-1]) - def initialize (at, schedulable, params) - super(schedulable, params) - @at = at - end - end + if j + iend = Integer(item[i+1..j]) + else + iend = Integer(item[i+1..-1]) + end - class CronEntry < Entry + else # case */x + istart = min + iend = max + end - attr_accessor \ - :cron_line + istart = min if istart < min + iend = max if iend > max - def initialize (line, schedulable, params) + result = [] - super(schedulable, params) + value = istart + while true + result << value + value = value + inc + break if value > iend + end - if line.kind_of?(String) - @cronline = CronLine.new(line) - elsif line.kind_of?(CronLine) - @cronline = line - else - raise \ - "Cannot initialize a CronEntry " + - "with a param of class #{line.class}" + return result end - @cron_line = CronLine.new(line) - end + def no_match? (value, cron_values) - def matches? (time) - @cron_line.matches?(time) - end + return false if not cron_values + + cron_values.each do |v| + return false if value == v + end + + return true + end end end