lib/openwfe/util/scheduler.rb in openwferu-0.9.2 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.3
- old
+ new
@@ -39,18 +39,58 @@
# John Mettraux at openwfe.org
#
require 'monitor'
-require 'openwfe/otime'
-require 'openwfe/utils'
+#require 'openwfe/utils'
+require 'openwfe/util/otime'
+require 'openwfe/util/stoppable'
module OpenWFE
+ #
+ # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
+ # 'at' jobs to execute once at a given point in time. 'cron' jobs
+ # execute a specified intervals.
+ # The two main methods are thus schedule_at() and schedule().
+ #
+ # schedule_at() and schedule() await either a Schedulable instance and
+ # params (usually an array or nil), either a block, which is more in the
+ # Ruby way.
+ #
+ # Two examples :
+ #
+ # scheduler.schedule_in("3d") do
+ # regenerate_monthly_report()
+ # end
+ # #
+ # # will call the regenerate_monthly_report method
+ # # in 3 days from now
+ #
+ # and
+ #
+ # class Regenerator < Schedulable
+ # def trigger (frequency)
+ # self.send(frequency)
+ # end
+ # def monthly
+ # # ...
+ # end
+ # def yearly
+ # # ...
+ # end
+ # end
+ #
+ # regenerator = Regenerator.new
+ #
+ # scheduler.schedule_in("4d", r, :monthly)
+ # #
+ # # will regenerate the monthly report in four days
+ #
class Scheduler
- include MonitorMixin
+ include MonitorMixin, Stoppable
attr_accessor \
:precision
def initialize
@@ -67,96 +107,50 @@
# every 250ms, the scheduler wakes up
@last_cron_minute = -1
end
- def stop
- @scheduler_thread.stop \
- if @scheduler_thread and not @scheduler_thread.stop?
- end
-
+ #
+ # Starts this scheduler (or restart it if it was previously stopped)
+ #
def start
- if @scheduler_thread
- @scheduler_thread.wakeup
- return
- end
+ #if @scheduler_thread
+ # @scheduler_thread.wakeup
+ # return
+ #end
+
@scheduler_thread = Thread.new do
while true
+ break if self.is_stopped?
+ #print "."
+ #$stdout.flush
step
sleep(@precision)
end
end
- end
- def step
- synchronize do
- now = Time.new
- minute = now.to_i / 60
-
- #puts "step() minute is #{minute}"
- #puts "step() last_cron_minute is #{@last_cron_minute}"
-
- #
- # cron entries
-
- begin
- if minute > @last_cron_minute
- @last_cron_minute = minute
- @cron_entries.each do |cron_id, cron_entry|
- #puts "step() cron_id : #{cron_id}"
- cron_entry.trigger \
- if cron_entry.matches? now
- end
- end
- rescue Exception => e
- #puts \
- # "step() caught exception\n" +
- # OpenWFE::exception_to_s(e)
- end
-
- #
- # pending jobs
-
- now = now.to_f
- #
- # that's what at jobs do understand
-
- while true
-
- #puts "step() job.count is #{@pending_jobs.length}"
-
- break if @pending_jobs.length < 1
-
- job = @pending_jobs[0]
-
- #puts "step() job.at is #{job.at}"
- #puts "step() now is #{now}"
-
- break if job.at > now
-
- #if job.at <= now
- #
- # obviously
-
- job.trigger()
- @pending_jobs.delete_at(0)
- end
- end
+ do_restart
end
#
- # joins on the scheduler thread
+ # The scheduler is stoppable via stop() or do_stop()
#
+ alias :stop :do_stop
+
+ #
+ # Joins on the scheduler thread
+ #
def join
@scheduler_thread.join
end
#
- # schedules a job by specifying at which time it should trigger
+ # Schedules a job by specifying at which time it should trigger.
+ # Returns the a job_id that can be used to unschedule the job.
#
- def schedule_at (at, schedulable, params)
+ def schedule_at (at, schedulable=nil, params=nil, &block)
synchronize do
#puts "0 at is '#{at.to_s}' (#{at.class})"
at = OpenWFE::to_ruby_time(at) \
@@ -166,13 +160,14 @@
if at.kind_of? DateTime
at = at.to_f \
if at.kind_of? Time
- #puts "1 at is '#{at.to_s}' (#{at.class})"
+ #puts "1 at is '#{at.to_s}' (#{at.class})"}"
- job = JobEntry.new(at, schedulable, params)
+ b = to_block(schedulable, params, &block)
+ job = AtEntry.new(at, &b)
if at < (Time.new.to_f + @precision)
job.trigger()
return nil
end
@@ -191,63 +186,143 @@
return push(job, i)
end
end
return push(job)
-
end
end
#
- # schedules a job by stating in how much time it should trigger
+ # Schedules a job by stating in how much time it should trigger.
+ # Returns the a job_id that can be used to unschedule the job.
#
- def schedule_in (duration, schedulable, params)
+ def schedule_in (duration, schedulable=nil, params=nil, &block)
if duration.kind_of?(String)
duration = OpenWFE::parse_time_string(duration)
elsif not duration.kind_of?(Float)
duration = Float(duration.to_s)
end
- return schedule_at(Time.new.to_f + duration, schedulable, params)
+ return schedule_at(
+ Time.new.to_f + duration, schedulable, params, &block)
end
#
- # unschedules 'at' or 'cron' job
+ # Unschedules an 'at' or a 'cron' job identified by the id
+ # it was given at schedule time.
#
- def unschedule (entry_id)
+ def unschedule (job_id)
synchronize do
for i in 0...@pending_jobs.length
- if @pending_jobs[i].eid == entry_id
+ if @pending_jobs[i].eid == job_id
@pending_jobs.delete_at(i)
return true
end
end
- if @cron_entries.has_key?(entry_id)
- @cron_entries.delete(entry_id)
+ return true if unschedule_cron_job(job_id)
+
+ return false
+ end
+ end
+
+ #
+ # Unschedules a cron job
+ #
+ def unschedule_cron_job (job_id)
+ synchronize do
+ if @cron_entries.has_key?(job_id)
+ @cron_entries.delete(job_id)
return true
end
-
return false
end
end
#
- # schedules a cron job
+ # Schedules a cron job, the 'cron_line' is a string
+ # following the Unix cron standard (see "man 5 crontab" in your command
+ # line).
#
- def schedule (cron_line, schedulable, params)
+ # For example :
+ #
+ # scheduler.schedule("5 0 * * *", nil, s, p)
+ # # will trigger the schedulable s with params p every day
+ # # five minutes after midnight
+ #
+ # scheduler.schedule("15 14 1 * *", nil, s, p)
+ # # will trigger s at 14:15 on the first of every month
+ #
+ # scheduler.schedule("0 22 * * 1-5") do
+ # puts "it's break time..."
+ # end
+ # # outputs a message every weekday at 10pm
+ #
+ # Returns the job id attributed to this 'cron job', this id can
+ # be used to unschedule the job.
+ #
+ def schedule \
+ (cron_line, cron_id=nil, schedulable=nil, params=nil, &block)
+
synchronize do
- entry = CronEntry.new(cron_line, schedulable, params)
+
+ #
+ # is a job with the same id already scheduled ?
+
+ if cron_id
+ if unschedule(cron_id)
+ ldebug do
+ "schedule() unscheduled previous job "+
+ "under same name '#{cron_id}'"
+ end
+ end
+ end
+
+ #
+ # schedule
+
+ b = to_block(schedulable, params, &block)
+ entry = CronEntry.new(cron_id, cron_line, &b)
@cron_entries[entry.eid] = entry
+
return entry.eid
end
end
+ #
+ # Returns the job corresponding to job_id, an instance of AtEntry
+ # or CronEntry will be returned.
+ #
+ def get_job (job_id)
+
+ entry = @cron_entries[job_id]
+ return c if c
+
+ @pending_jobs.each do |entry|
+ return entry if entry.eid == job_id
+ end
+
+ return nil
+ end
+
protected
+ def to_block (schedulable, params, &block)
+ if schedulable
+ lambda do
+ schedulable.trigger(params)
+ end
+ else
+ block
+ end
+ end
+
+ #
+ # Pushes an 'at' job into the pending job list
+ #
def push (job, index=-1)
if index == -1
#
# push job at the end
@@ -262,10 +337,74 @@
#puts "push() at '#{Time.at(job.at)}'"
return job.eid
end
+
+ #
+ # This is the method called each time the scheduler wakes up
+ # (by default 4 times per second). It's meant to quickly
+ # determine if there are jobs to trigger else to get back to sleep.
+ # 'cron' jobs get executed if necessary then 'at' jobs.
+ #
+ def step
+ synchronize do
+ now = Time.new
+ minute = now.min
+
+ #
+ # cron entries
+
+ begin
+ if now.sec == 0 and minute > @last_cron_minute
+ #
+ # only consider cron entries at the second 0 of a
+ # minute
+
+ @last_cron_minute = minute
+
+ @cron_entries.each do |cron_id, cron_entry|
+ #puts "step() cron_id : #{cron_id}"
+ cron_entry.trigger \
+ if cron_entry.matches? now
+ end
+ end
+ rescue Exception => e
+ #puts \
+ # "step() caught exception\n" +
+ # OpenWFE::exception_to_s(e)
+ end
+
+ #
+ # pending jobs
+
+ now = now.to_f
+ #
+ # that's what at jobs do understand
+
+ while true
+
+ #puts "step() job.count is #{@pending_jobs.length}"
+
+ break if @pending_jobs.length < 1
+
+ job = @pending_jobs[0]
+
+ #puts "step() job.at is #{job.at}"
+ #puts "step() now is #{now}"
+
+ break if job.at > now
+
+ #if job.at <= now
+ #
+ # obviously
+
+ job.trigger()
+ @pending_jobs.delete_at(0)
+ end
+ end
+ end
end
#
# This module adds a trigger method to any class that includes it.
# The default implementation feature here triggers an exception.
@@ -273,216 +412,261 @@
module Schedulable
def trigger (params)
raise "trigger() implementation is missing"
end
+
+ def reschedule (scheduler)
+ raise "reschedule() implentation is missing"
+ end
end
- #
- # a 'cron line' is a line in the sense of a crontab (man 5 cron) file
- #
- class CronLine
+ protected
- def initialize (line)
+ JOB_ID_LOCK = Monitor.new
- super()
+ class Entry
- items = line.split
+ @@last_given_id = 0
+ #
+ # as a scheduler is fully transient, no need to
+ # have persistent ids, a simple counter is sufficient
- if items.length != 5
- raise \
- "cron '#{line}' string should hold 5 items, " +
- "not #{items.length}" \
+ attr_accessor \
+ :eid, :block
+
+ def initialize (entry_id=nil, &block)
+ @block = block
+ if entry_id
+ @eid = entry_id
+ else
+ JOB_ID_LOCK.synchronize do
+ @eid = @@last_given_id
+ @@last_given_id = @eid + 1
+ end
+ end
end
- @minutes = parse_item(items[0], 0, 59)
- @hours = parse_item(items[1], 0, 24)
- @days = parse_item(items[2], 1, 31)
- @months = parse_item(items[3], 1, 12)
- @weekdays = parse_item(items[4], 1, 7)
-
- adjust_arrays()
+ #def trigger
+ # @block.call @eid
+ #end
end
- def matches? (time)
+ class AtEntry < Entry
- if time.kind_of?(Float) or time.kind_of?(Integer)
- time = Time.at(time)
+ attr_accessor \
+ :at
+
+ def initialize (at, &block)
+ super(&block)
+ @at = at
end
- return false if no_match?(time.min, @minutes)
- return false if no_match?(time.hour, @hours)
- return false if no_match?(time.day, @days)
- return false if no_match?(time.month, @months)
- return false if no_match?(time.wday, @weekdays)
-
- return true
+ def trigger
+ @block.call @eid, @at
+ end
end
- private
+ class CronEntry < Entry
- #
- # adjust values to Ruby
- #
- def adjust_arrays()
- if @hours
- @hours.each do |h|
- h = 0 if h == 23
- end
+ attr_accessor \
+ :cron_line
+
+ def initialize (cron_id, line, &block)
+
+ super(cron_id, &block)
+
+ if line.kind_of? String
+ @cron_line = CronLine.new(line)
+ elsif line.kind_of? CronLine
+ @cron_line = line
+ else
+ raise \
+ "Cannot initialize a CronEntry " +
+ "with a param of class #{line.class}"
end
- if @weekdays
- @weekdays.each do |wd|
- wd = wd - 1
- end
- end
end
- def parse_item (item, min, max)
+ def matches? (time)
+ @cron_line.matches? time
+ end
- return nil \
- if item == "*"
- return parse_list(item, min, max) \
- if item.index(",") > -1
- return parse_range(item, min, max) \
- if item.index("*") > -1 or item.index("-") > -1
+ def trigger
+ @block.call @eid, @cron_line
+ end
+ end
- i = Integer(item)
+ #
+ # A 'cron line' is a line in the sense of a crontab
+ # (man 5 crontab) file line.
+ #
+ class CronLine
- i = min if i < min
- i = max if i > max
+ attr_reader \
+ :minutes,
+ :hours,
+ :days,
+ :months,
+ :weekdays
- return [ i ]
- end
+ def initialize (line)
- def parse_list (item, min, max)
- items = item.split(",")
- result = []
- items.each do |i|
- i = Integer(i)
- i = min if i < min
- i = max if i > max
- result << i
+ super()
+
+ items = line.split
+
+ if items.length != 5
+ raise \
+ "cron '#{line}' string should hold 5 items, " +
+ "not #{items.length}" \
end
- return result
+
+ @minutes = parse_item(items[0], 0, 59)
+ @hours = parse_item(items[1], 0, 24)
+ @days = parse_item(items[2], 1, 31)
+ @months = parse_item(items[3], 1, 12)
+ @weekdays = parse_weekdays(items[4])
+
+ adjust_arrays()
end
- def parse_range (item, min, max)
- i = item.index("-")
- j = item.index("/")
+ def matches? (time)
- inc = 1
+ if time.kind_of?(Float) or time.kind_of?(Integer)
+ time = Time.at(time)
+ end
- inc = Integer(item[j+1..-1]) if j > -1
+ return false if no_match?(time.min, @minutes)
+ return false if no_match?(time.hour, @hours)
+ return false if no_match?(time.day, @days)
+ return false if no_match?(time.month, @months)
+ return false if no_match?(time.wday, @weekdays)
- istart = -1
- iend = -1
+ return true
+ end
- if i > -1
+ #
+ # Returns an array of 5 arrays (minutes, hours, days, months,
+ # weekdays).
+ # This method is used by the cronline unit tests.
+ #
+ def to_array
+ [ @minutes, @hours, @days, @months, @weekdays ]
+ end
- istart = Integer(item[0..i])
+ private
- if j > -1
- iend = Integer(item[i+1..j])
- else
- iend = Integer(i+1..-1)
+ #
+ # adjust values to Ruby
+ #
+ def adjust_arrays()
+ if @hours
+ @hours.each do |h|
+ h = 0 if h == 23
+ end
end
-
- else # case */x
- istart = min
- iend = max
+ if @weekdays
+ @weekdays.each do |wd|
+ wd = wd - 1
+ end
+ end
end
- istart = min if istart < min
- iend = max if iend > max
+ WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ]
+ #
+ # used by parse_weekday()
- result = []
+ def parse_weekdays (item)
- value = istart
- while true
- result << value
- value = value + inc
- break if value > iend
- end
+ item = item.downcase
- return result
- end
+ WDS.each_with_index do |day, index|
+ item = item.gsub(day, "#{index+1}")
+ end
- def no_match? (value, cron_values)
+ return parse_item(item, 1, 7)
+ end
- return false if not cron_values
+ def parse_item (item, min, max)
- cron_values.each do |v|
- return false if value == v
- end
+ return nil \
+ if item == "*"
+ return parse_list(item, min, max) \
+ if item.index(",")
+ return parse_range(item, min, max) \
+ if item.index("*") or item.index("-")
- return true
- end
- end
+ i = Integer(item)
- protected
+ i = min if i < min
+ i = max if i > max
- JOB_ID_LOCK = Monitor.new
+ return [ i ]
+ end
- class Entry
+ def parse_list (item, min, max)
+ items = item.split(",")
+ result = []
+ items.each do |i|
+ i = Integer(i)
+ i = min if i < min
+ i = max if i > max
+ result << i
+ end
+ return result
+ end
- @@last_given_id = 0
- #
- # as a scheduler is fully transient, no need to
- # have persistent ids, a simple counter is sufficient
+ def parse_range (item, min, max)
+ i = item.index("-")
+ j = item.index("/")
- attr_accessor \
- :eid, :schedulable, :params
+ inc = 1
- def initialize (schedulable, params)
- @schedulable = schedulable
- @params = params
- JOB_ID_LOCK.synchronize do
- @eid = @@last_given_id + 1
- @@last_given_id = @eid
- end
- end
+ inc = Integer(item[j+1..-1]) if j
- def trigger
- @schedulable.trigger(params)
- end
- end
+ istart = -1
+ iend = -1
- class JobEntry < Entry
+ if i
- attr_accessor \
- :at
+ istart = Integer(item[0..i-1])
- def initialize (at, schedulable, params)
- super(schedulable, params)
- @at = at
- end
- end
+ if j
+ iend = Integer(item[i+1..j])
+ else
+ iend = Integer(item[i+1..-1])
+ end
- class CronEntry < Entry
+ else # case */x
+ istart = min
+ iend = max
+ end
- attr_accessor \
- :cron_line
+ istart = min if istart < min
+ iend = max if iend > max
- def initialize (line, schedulable, params)
+ result = []
- super(schedulable, params)
+ value = istart
+ while true
+ result << value
+ value = value + inc
+ break if value > iend
+ end
- if line.kind_of?(String)
- @cronline = CronLine.new(line)
- elsif line.kind_of?(CronLine)
- @cronline = line
- else
- raise \
- "Cannot initialize a CronEntry " +
- "with a param of class #{line.class}"
+ return result
end
- @cron_line = CronLine.new(line)
- end
+ def no_match? (value, cron_values)
- def matches? (time)
- @cron_line.matches?(time)
- end
+ return false if not cron_values
+
+ cron_values.each do |v|
+ return false if value == v
+ end
+
+ return true
+ end
end
end