lib/openwfe/util/scheduler.rb in openwferu-0.9.15 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.16
- old
+ new
@@ -1,6 +1,6 @@
-#
+
#--
# Copyright (c) 2006-2007, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -51,21 +51,30 @@
#
# schedule_at() and schedule() await either a Schedulable instance and
# params (usually an array or nil), either a block, which is more in the
# Ruby way.
#
- # Two examples :
+ # Some examples :
#
# scheduler.schedule_in("3d") do
# regenerate_monthly_report()
# end
# #
# # will call the regenerate_monthly_report method
# # in 3 days from now
#
- # and
+ # scheduler.schedule "0 22 * * 1-5" do
+ # log.info "activating security system..."
+ # activate_security_system()
+ # end
#
+ # job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
+ # init_self_destruction_sequence()
+ # end
+ #
+ # an example that uses a Schedulable class :
+ #
# class Regenerator < Schedulable
# def trigger (frequency)
# self.send(frequency)
# end
# def monthly
@@ -100,19 +109,53 @@
# Use with care though, if you create a scheduler, set this attribute
# to true and start the scheduler, the scheduler will immediately exit.
# This attribute is best used indirectly : the method
# join_until_no_more_jobs() wraps it.
#
- # Since OpenWFEru 0.9.13, the :scheduler_precision can be set when
- # instantiating the scheduler.
+ # The :scheduler_precision can be set when instantiating the scheduler.
#
# scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
# scheduler.start
# #
# # instatiates a scheduler that checks its jobs twice per second
# # (the default is 4 times per second (0.250))
#
+ #
+ # Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :
+ #
+ # scheduler.schedule_in "2h", :tags => "backup" do
+ # init_backup_sequence()
+ # end
+ #
+ # scheduler.schedule "0 24 * * *", :tags => "new_day" do
+ # do_this_or_that()
+ # end
+ #
+ # jobs = find_jobs 'backup'
+ # jobs.each { |job| job.unschedule }
+ #
+ # Multiple tags may be attached to a single job :
+ #
+ # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
+ # init_backup_sequence()
+ # end
+ #
+ # The vanilla case for tags assume they are String instances, but nothing
+ # prevents you from using anything else. The scheduler has no persistence
+ # by itself, so no serialization issue.
+ #
+ #
+ # Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :
+ #
+ # scheduler.schedule "7 * * * * *" do
+ # puts "it's now the seventh second of the minute"
+ # end
+ #
+ # The OpenWFEru scheduler recognizes an optional first column for second
+ # scheduling. This column can, like for the other columns, specify a
+ # value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
+ #
class Scheduler
include MonitorMixin
#
# By default, the precision is 0.250, with means the scheduler
@@ -129,11 +172,11 @@
def initialize (params={})
super()
@pending_jobs = []
- @cron_entries = {}
+ @cron_jobs = {}
@scheduler_thread = nil
@precision = 0.250
# every 250ms, the scheduler wakes up (default value)
@@ -144,11 +187,11 @@
end
@exit_when_no_more_jobs = false
@dont_reschedule_every = false
- @last_cron_minute = -1
+ @last_cron_second = -1
@stopped = true
end
#
@@ -159,12 +202,15 @@
@stopped = false
@scheduler_thread = Thread.new do
if defined?(JRUBY_VERSION)
+
require 'java'
- java.lang.Thread.current_thread.name = "openwferu scheduler (Ruby Thread)"
+
+ java.lang.Thread.current_thread.name = \
+ "openwferu scheduler (Ruby Thread)"
end
while true
break if @stopped
step
@@ -250,43 +296,43 @@
#
# Schedules a job in a loop. After an execution, it will not execute
# before the time specified in 'freq'.
#
- # Note that if your job takes 2s to execute and the freq is set to
- # 10s, it will in fact execute every 12s.
- # You can however wrap the code within its own thread :
- #
- # scheduler.schedule_every("12s") do
- # Thread.new do
- # do_the_job()
- # end
- # end
- #
# This method returns a job identifier which can be used to unschedule()
# the job.
#
def schedule_every (freq, params={}, &block)
f = duration_to_f freq
params = prepare_params params
schedulable = params[:schedulable]
- params[:every] = true
+ params[:every] = freq
- sschedule_at Time.new.to_f + f, params do |job_id, at|
+ last_at = params[:last_at]
+ next_at = if last_at
+ last_at + f
+ else
+ Time.now.to_f + f
+ end
- params[:job_id] = job_id
+ sschedule_at next_at, params do |job_id, at|
if schedulable
schedulable.trigger(params)
else
block.call job_id, at
end
+
+ params[:job_id] = job_id
+ params[:last_at] = at
- schedule_every(f, params, &block) \
+ schedule_every(freq, params, &block) \
unless @dont_reschedule_every
+ #
+ # yes, this is a kind of recursion
job_id
end
end
@@ -296,11 +342,11 @@
#
def unschedule (job_id)
synchronize do
for i in 0...@pending_jobs.length
- if @pending_jobs[i].eid == job_id
+ if @pending_jobs[i].job_id == job_id
@pending_jobs.delete_at i
return true
end
end
@@ -311,12 +357,12 @@
#
# Unschedules a cron job
#
def unschedule_cron_job (job_id)
synchronize do
- if @cron_entries.has_key?(job_id)
- @cron_entries.delete job_id
+ if @cron_jobs.has_key?(job_id)
+ @cron_jobs.delete job_id
return true
end
false
end
end
@@ -361,28 +407,30 @@
#
# schedule
b = to_block(params, &block)
- entry = CronEntry.new(cron_id, cron_line, &b)
- @cron_entries[entry.eid] = entry
+ job = CronJob.new(self, cron_id, cron_line, params, &b)
+ @cron_jobs[job.job_id] = job
- entry.eid
+ job.job_id
end
end
#
- # Returns the job corresponding to job_id, an instance of AtEntry
- # or CronEntry will be returned.
+ # Returns the job corresponding to job_id, an instance of AtJob
+ # or CronJob will be returned.
#
def get_job (job_id)
- entry = @cron_entries[job_id]
- return entry if entry
+ job = @cron_jobs[job_id]
+ return job if job
- @pending_jobs.find do |entry|
- entry.eid == job_id
+ synchronize do
+ @pending_jobs.find do |job|
+ job.job_id == job_id
+ end
end
end
#
# Finds a job (via get_job()) and then returns the wrapped
@@ -392,16 +440,51 @@
return nil unless job_id
j = get_job(job_id)
- return j.schedulable if j.respond_to? :schedulable
+ return j.schedulable if j.respond_to?(:schedulable)
nil
end
#
+ # Returns an array of jobs that have the given tag.
+ #
+ def find_jobs (tag)
+
+ result = @cron_jobs.values.find_all do |job|
+ job.has_tag?(tag)
+ end
+
+ synchronize do
+ result + @pending_jobs.find_all do |job|
+ job.has_tag?(tag)
+ end
+ end
+ end
+
+ #
+ # Finds the jobs with the given tag and then returns an array of
+ # the wrapped Schedulable objects.
+ # Jobs that haven't a wrapped Schedulable won't be included in the
+ # result.
+ #
+ def find_schedulables (tag)
+
+ jobs = find_jobs(tag)
+
+ result = []
+
+ jobs.each do |job|
+ result.push(job.schedulable) if job.respond_to?(:schedulable)
+ end
+
+ result
+ end
+
+ #
# Returns the number of currently pending jobs in this scheduler
# ('at' jobs and 'every' jobs).
#
def pending_job_count
@pending_jobs.size
@@ -409,25 +492,25 @@
#
# Returns the number of cron jobs currently active in this scheduler.
#
def cron_job_count
- @cron_entries.size
+ @cron_jobs.size
end
#
# Returns the current count of 'every' jobs scheduled.
#
def every_job_count
- @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size
+ @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
end
#
# Returns the current count of 'at' jobs scheduled (not 'every').
#
def at_job_count
- @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size
+ @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
end
#
# Returns true if the given string seems to be a cron string.
#
@@ -465,20 +548,21 @@
if at.kind_of?(Time)
#puts "1 at is '#{at.to_s}' (#{at.class})"}"
jobClass = if params[:every]
- EveryEntry
+ EveryJob
else
- AtEntry
+ AtJob
end
job_id = params[:job_id]
b = to_block(params, &block)
- job = jobClass.new(at, job_id, &b)
+ job = jobClass.new(self, at, job_id, params, &b)
+
unschedule(job_id) if job_id
if at < (Time.new.to_f + @precision)
job.trigger() unless params[:discard_past]
return nil
@@ -559,11 +643,11 @@
@pending_jobs[index, 0] = job
end
#puts "push() at '#{Time.at(job.at)}'"
- job.eid
+ job.job_id
end
#
# This is the method called each time the scheduler wakes up
# (by default 4 times per second). It's meant to quickly
@@ -572,11 +656,10 @@
#
def step
synchronize do
now = Time.new
- minute = now.min
if @exit_when_no_more_jobs
if @pending_jobs.size < 1
@@ -585,27 +668,27 @@
end
@dont_reschedule_every = true if at_job_count < 1
end
+ # TODO : eventually consider running cron / pending
+ # job triggering in two different threads
#
- # cron entries
+ # but well... there's the synchronization issue...
- if now.sec == 0 and
- (minute > @last_cron_minute or
- @last_cron_minute == 59)
- #
- # only consider cron entries at the second 0 of a
- # minute
+ #
+ # cron jobs
- @last_cron_minute = minute
+ if now.sec != @last_cron_second
- #puts "step() @cron_entries.size #{@cron_entries.size}"
+ @last_cron_second = now.sec
- @cron_entries.each do |cron_id, cron_entry|
+ #puts "step() @cron_jobs.size #{@cron_jobs.size}"
+
+ @cron_jobs.each do |cron_id, cron_job|
#puts "step() cron_id : #{cron_id}"
- trigger(cron_entry) if cron_entry.matches? now
+ trigger(cron_job) if cron_job.matches?(now)
end
end
#
# pending jobs
@@ -629,21 +712,30 @@
#if job.at <= now
#
# obviously
- trigger(job)
+ trigger job
@pending_jobs.delete_at(0)
end
end
end
- def trigger (entry)
+ #
+ # Triggers the job (in a dedicated thread).
+ #
+ # If an error occurs in the job, it well get caught and an error
+ # message will be displayed to STDOUT.
+ # If this scheduler provides a lwarn(message) method, it will
+ # be used insted.
+ #
+ def trigger (job)
+
Thread.new do
begin
- entry.trigger
+ job.trigger
rescue Exception => e
message =
"trigger() caught exception\n" +
OpenWFE::exception_to_s(e)
if self.respond_to? :lwarn
@@ -672,160 +764,296 @@
end
protected
JOB_ID_LOCK = Monitor.new
+ #
+ # would it be better to use a Mutex instead of a full-blown
+ # Monitor ?
- class Entry
+ #
+ # The parent class for scheduled jobs.
+ #
+ class Job
@@last_given_id = 0
#
# as a scheduler is fully transient, no need to
# have persistent ids, a simple counter is sufficient
- attr_accessor \
- :eid, :block
+ #
+ # The identifier for the job
+ #
+ attr_accessor :job_id
- def initialize (entry_id=nil, &block)
+ #
+ # An array of tags
+ #
+ attr_accessor :tags
+
+ #
+ # The block to execute at trigger time
+ #
+ attr_accessor :block
+
+ #
+ # A reference to the scheduler
+ #
+ attr_reader :scheduler
+
+ #
+ # Keeping a copy of the initialization params of the job.
+ #
+ attr_reader :params
+
+
+ def initialize (scheduler, job_id, params, &block)
+
+ @scheduler = scheduler
@block = block
- if entry_id
- @eid = entry_id
+
+ if job_id
+ @job_id = job_id
else
JOB_ID_LOCK.synchronize do
- @eid = @@last_given_id
- @@last_given_id = @eid + 1
+ @job_id = @@last_given_id
+ @@last_given_id = @job_id + 1
end
end
+
+ @params = params
+
+ #@tags = Array(tags).collect { |tag| tag.to_s }
+ # making sure we have an array of String tags
+
+ @tags = Array(params[:tags])
+ # any tag is OK
end
- #def trigger
- # @block.call @eid
- #end
+ #
+ # Returns true if this job sports the given tag
+ #
+ def has_tag? (tag)
+
+ @tags.include?(tag)
+ end
+
+ #
+ # Removes (cancels) this job from its scheduler.
+ #
+ def unschedule
+
+ @scheduler.unschedule(@job_id)
+ end
end
- class AtEntry < Entry
+ #
+ # An 'at' job.
+ class AtJob < Job
- attr_accessor \
- :at
+ #
+ # The float representation (Time.to_f) of the time at which
+ # the job should be triggered.
+ #
+ attr_accessor :at
- def initialize (at, at_id, &block)
- super(at_id, &block)
+ #
+ # The constructor.
+ #
+ def initialize (scheduler, at, at_id, params, &block)
+
+ super(scheduler, at_id, params, &block)
@at = at
end
+ #
+ # Triggers the job (calls the block)
+ #
def trigger
- @block.call @eid, @at
+
+ @block.call @job_id, @at
end
+
+ #
+ # Returns the Time instance at which this job is scheduled.
+ #
+ def schedule_info
+
+ Time.at(@at)
+ end
end
- class EveryEntry < AtEntry
+ #
+ # An 'every' job is simply an extension of an 'at' job.
+ #
+ class EveryJob < AtJob
+
+ #
+ # Returns the frequency string used to schedule this EveryJob,
+ # like for example "3d" or "1M10d3h".
+ #
+ def schedule_info
+
+ @params[:every]
+ end
end
- class CronEntry < Entry
+ #
+ # A cron job.
+ #
+ class CronJob < Job
- attr_accessor \
- :cron_line
+ #
+ # The CronLine instance representing the times at which
+ # the cron job has to be triggered.
+ #
+ attr_accessor :cron_line
- def initialize (cron_id, line, &block)
+ def initialize (scheduler, cron_id, line, params, &block)
- super(cron_id, &block)
+ super(scheduler, cron_id, params, &block)
- if line.kind_of?(String)
+ if line.is_a?(String)
+
@cron_line = CronLine.new(line)
- elsif line.kind_of?(CronLine)
+
+ elsif line.is_a?(CronLine)
+
@cron_line = line
+
else
+
raise \
- "Cannot initialize a CronEntry " +
+ "Cannot initialize a CronJob " +
"with a param of class #{line.class}"
end
end
+ #
+ # This is the method called by the scheduler to determine if it
+ # has to fire this CronJob instance.
+ #
def matches? (time)
+
@cron_line.matches? time
end
+ #
+ # As the name implies.
+ #
def trigger
- @block.call @eid, @cron_line
+
+ @block.call @job_id, @cron_line
end
+
+ #
+ # Returns the original cron tab string used to schedule this
+ # Job. Like for example "60/3 * * * Sun".
+ #
+ def schedule_info
+
+ @cron_line.original
+ end
end
#
# A 'cron line' is a line in the sense of a crontab
# (man 5 crontab) file line.
#
class CronLine
+ #
+ # The string used for creating this cronline instance.
+ #
+ attr_reader :original
+
attr_reader \
+ :seconds,
:minutes,
:hours,
:days,
:months,
:weekdays
def initialize (line)
super()
+ @original = line
+
items = line.split
- if items.length != 5
+ unless [ 5, 6 ].include?(items.length)
raise \
- "cron '#{line}' string should hold 5 items, " +
+ "cron '#{line}' string should hold 5 or 6 items, " +
"not #{items.length}" \
end
- @minutes = parse_item(items[0], 0, 59)
- @hours = parse_item(items[1], 0, 24)
- @days = parse_item(items[2], 1, 31)
- @months = parse_item(items[3], 1, 12)
- @weekdays = parse_weekdays(items[4])
+ offset = items.length - 5
- adjust_arrays()
+ @seconds = if offset == 1
+ parse_item(items[0], 0, 59)
+ else
+ [ 0 ]
+ end
+ @minutes = parse_item(items[0+offset], 0, 59)
+ @hours = parse_item(items[1+offset], 0, 24)
+ @days = parse_item(items[2+offset], 1, 31)
+ @months = parse_item(items[3+offset], 1, 12)
+ @weekdays = parse_weekdays(items[4+offset])
+
+ #adjust_arrays()
end
+ #
+ # Returns true if the given time matches this cron line.
+ #
def matches? (time)
if time.kind_of?(Float) or time.kind_of?(Integer)
time = Time.at(time)
end
+ return false if no_match?(time.sec, @seconds)
return false if no_match?(time.min, @minutes)
return false if no_match?(time.hour, @hours)
return false if no_match?(time.day, @days)
return false if no_match?(time.month, @months)
return false if no_match?(time.wday, @weekdays)
- return true
+ true
end
#
- # Returns an array of 5 arrays (minutes, hours, days, months,
- # weekdays).
+ # Returns an array of 6 arrays (seconds, minutes, hours, days,
+ # months, weekdays).
# This method is used by the cronline unit tests.
#
def to_array
- [ @minutes, @hours, @days, @months, @weekdays ]
+ [ @seconds, @minutes, @hours, @days, @months, @weekdays ]
end
private
- #
+ #--
# adjust values to Ruby
#
- def adjust_arrays()
- if @hours
- @hours.each do |h|
- h = 0 if h == 23
- end
- end
- if @weekdays
- @weekdays.each do |wd|
- wd = wd - 1
- end
- end
- end
+ #def adjust_arrays()
+ # @hours = @hours.collect { |h|
+ # if h == 24
+ # 0
+ # else
+ # h
+ # end
+ # } if @hours
+ # @weekdays = @weekdays.collect { |wd|
+ # wd - 1
+ # } if @weekdays
+ #end
+ #
+ # dead code, keeping as a reminder
+ #++
WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ]
#
# used by parse_weekday()
@@ -835,11 +1063,11 @@
WDS.each_with_index do |day, index|
item = item.gsub(day, "#{index+1}")
end
- return parse_item(item, 1, 7)
+ parse_item(item, 1, 7)
end
def parse_item (item, min, max)
return nil \
@@ -852,26 +1080,28 @@
i = Integer(item)
i = min if i < min
i = max if i > max
- return [ i ]
+ [ i ]
end
def parse_list (item, min, max)
+
items = item.split(",")
result = []
items.each do |i|
i = Integer(i)
i = min if i < min
i = max if i > max
result << i
end
- return result
+ result
end
def parse_range (item, min, max)
+
i = item.index("-")
j = item.index("/")
inc = 1
@@ -889,10 +1119,11 @@
else
iend = Integer(item[i+1..-1])
end
else # case */x
+
istart = min
iend = max
end
istart = min if istart < min
@@ -900,26 +1131,27 @@
result = []
value = istart
while true
+
result << value
value = value + inc
break if value > iend
end
- return result
+ result
end
def no_match? (value, cron_values)
return false if not cron_values
cron_values.each do |v|
return false if value == v
end
- return true
+ true
end
end
end