lib/openwfe/util/scheduler.rb in openwferu-0.9.15 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.16

- old
+ new

@@ -1,6 +1,6 @@ -# + #-- # Copyright (c) 2006-2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -51,21 +51,30 @@ # # schedule_at() and schedule() await either a Schedulable instance and # params (usually an array or nil), either a block, which is more in the # Ruby way. # - # Two examples : + # Some examples : # # scheduler.schedule_in("3d") do # regenerate_monthly_report() # end # # # # will call the regenerate_monthly_report method # # in 3 days from now # - # and + # scheduler.schedule "0 22 * * 1-5" do + # log.info "activating security system..." + # activate_security_system() + # end # + # job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do + # init_self_destruction_sequence() + # end + # + # an example that uses a Schedulable class : + # # class Regenerator < Schedulable # def trigger (frequency) # self.send(frequency) # end # def monthly @@ -100,19 +109,53 @@ # Use with care though, if you create a scheduler, set this attribute # to true and start the scheduler, the scheduler will immediately exit. # This attribute is best used indirectly : the method # join_until_no_more_jobs() wraps it. # - # Since OpenWFEru 0.9.13, the :scheduler_precision can be set when - # instantiating the scheduler. + # The :scheduler_precision can be set when instantiating the scheduler. # # scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500) # scheduler.start # # # # instatiates a scheduler that checks its jobs twice per second # # (the default is 4 times per second (0.250)) # + # + # Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled : + # + # scheduler.schedule_in "2h", :tags => "backup" do + # init_backup_sequence() + # end + # + # scheduler.schedule "0 24 * * *", :tags => "new_day" do + # do_this_or_that() + # end + # + # jobs = find_jobs 'backup' + # jobs.each { |job| job.unschedule } + # + # Multiple tags may be attached to a single job : + # + # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do + # init_backup_sequence() + # end + # + # The vanilla case for tags assume they are String instances, but nothing + # prevents you from using anything else. The scheduler has no persistence + # by itself, so no serialization issue. + # + # + # Since OpenWFEru 0.9.16, a cron schedule can be set at the second level : + # + # scheduler.schedule "7 * * * * *" do + # puts "it's now the seventh second of the minute" + # end + # + # The OpenWFEru scheduler recognizes an optional first column for second + # scheduling. This column can, like for the other columns, specify a + # value ("7"), a list of values ("7,8,9,27") or a range ("7-12"). + # class Scheduler include MonitorMixin # # By default, the precision is 0.250, with means the scheduler @@ -129,11 +172,11 @@ def initialize (params={}) super() @pending_jobs = [] - @cron_entries = {} + @cron_jobs = {} @scheduler_thread = nil @precision = 0.250 # every 250ms, the scheduler wakes up (default value) @@ -144,11 +187,11 @@ end @exit_when_no_more_jobs = false @dont_reschedule_every = false - @last_cron_minute = -1 + @last_cron_second = -1 @stopped = true end # @@ -159,12 +202,15 @@ @stopped = false @scheduler_thread = Thread.new do if defined?(JRUBY_VERSION) + require 'java' - java.lang.Thread.current_thread.name = "openwferu scheduler (Ruby Thread)" + + java.lang.Thread.current_thread.name = \ + "openwferu scheduler (Ruby Thread)" end while true break if @stopped step @@ -250,43 +296,43 @@ # # Schedules a job in a loop. After an execution, it will not execute # before the time specified in 'freq'. # - # Note that if your job takes 2s to execute and the freq is set to - # 10s, it will in fact execute every 12s. - # You can however wrap the code within its own thread : - # - # scheduler.schedule_every("12s") do - # Thread.new do - # do_the_job() - # end - # end - # # This method returns a job identifier which can be used to unschedule() # the job. # def schedule_every (freq, params={}, &block) f = duration_to_f freq params = prepare_params params schedulable = params[:schedulable] - params[:every] = true + params[:every] = freq - sschedule_at Time.new.to_f + f, params do |job_id, at| + last_at = params[:last_at] + next_at = if last_at + last_at + f + else + Time.now.to_f + f + end - params[:job_id] = job_id + sschedule_at next_at, params do |job_id, at| if schedulable schedulable.trigger(params) else block.call job_id, at end + + params[:job_id] = job_id + params[:last_at] = at - schedule_every(f, params, &block) \ + schedule_every(freq, params, &block) \ unless @dont_reschedule_every + # + # yes, this is a kind of recursion job_id end end @@ -296,11 +342,11 @@ # def unschedule (job_id) synchronize do for i in 0...@pending_jobs.length - if @pending_jobs[i].eid == job_id + if @pending_jobs[i].job_id == job_id @pending_jobs.delete_at i return true end end @@ -311,12 +357,12 @@ # # Unschedules a cron job # def unschedule_cron_job (job_id) synchronize do - if @cron_entries.has_key?(job_id) - @cron_entries.delete job_id + if @cron_jobs.has_key?(job_id) + @cron_jobs.delete job_id return true end false end end @@ -361,28 +407,30 @@ # # schedule b = to_block(params, &block) - entry = CronEntry.new(cron_id, cron_line, &b) - @cron_entries[entry.eid] = entry + job = CronJob.new(self, cron_id, cron_line, params, &b) + @cron_jobs[job.job_id] = job - entry.eid + job.job_id end end # - # Returns the job corresponding to job_id, an instance of AtEntry - # or CronEntry will be returned. + # Returns the job corresponding to job_id, an instance of AtJob + # or CronJob will be returned. # def get_job (job_id) - entry = @cron_entries[job_id] - return entry if entry + job = @cron_jobs[job_id] + return job if job - @pending_jobs.find do |entry| - entry.eid == job_id + synchronize do + @pending_jobs.find do |job| + job.job_id == job_id + end end end # # Finds a job (via get_job()) and then returns the wrapped @@ -392,16 +440,51 @@ return nil unless job_id j = get_job(job_id) - return j.schedulable if j.respond_to? :schedulable + return j.schedulable if j.respond_to?(:schedulable) nil end # + # Returns an array of jobs that have the given tag. + # + def find_jobs (tag) + + result = @cron_jobs.values.find_all do |job| + job.has_tag?(tag) + end + + synchronize do + result + @pending_jobs.find_all do |job| + job.has_tag?(tag) + end + end + end + + # + # Finds the jobs with the given tag and then returns an array of + # the wrapped Schedulable objects. + # Jobs that haven't a wrapped Schedulable won't be included in the + # result. + # + def find_schedulables (tag) + + jobs = find_jobs(tag) + + result = [] + + jobs.each do |job| + result.push(job.schedulable) if job.respond_to?(:schedulable) + end + + result + end + + # # Returns the number of currently pending jobs in this scheduler # ('at' jobs and 'every' jobs). # def pending_job_count @pending_jobs.size @@ -409,25 +492,25 @@ # # Returns the number of cron jobs currently active in this scheduler. # def cron_job_count - @cron_entries.size + @cron_jobs.size end # # Returns the current count of 'every' jobs scheduled. # def every_job_count - @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size + @pending_jobs.select { |j| j.is_a?(EveryJob) }.size end # # Returns the current count of 'at' jobs scheduled (not 'every'). # def at_job_count - @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size + @pending_jobs.select { |j| j.instance_of?(AtJob) }.size end # # Returns true if the given string seems to be a cron string. # @@ -465,20 +548,21 @@ if at.kind_of?(Time) #puts "1 at is '#{at.to_s}' (#{at.class})"}" jobClass = if params[:every] - EveryEntry + EveryJob else - AtEntry + AtJob end job_id = params[:job_id] b = to_block(params, &block) - job = jobClass.new(at, job_id, &b) + job = jobClass.new(self, at, job_id, params, &b) + unschedule(job_id) if job_id if at < (Time.new.to_f + @precision) job.trigger() unless params[:discard_past] return nil @@ -559,11 +643,11 @@ @pending_jobs[index, 0] = job end #puts "push() at '#{Time.at(job.at)}'" - job.eid + job.job_id end # # This is the method called each time the scheduler wakes up # (by default 4 times per second). It's meant to quickly @@ -572,11 +656,10 @@ # def step synchronize do now = Time.new - minute = now.min if @exit_when_no_more_jobs if @pending_jobs.size < 1 @@ -585,27 +668,27 @@ end @dont_reschedule_every = true if at_job_count < 1 end + # TODO : eventually consider running cron / pending + # job triggering in two different threads # - # cron entries + # but well... there's the synchronization issue... - if now.sec == 0 and - (minute > @last_cron_minute or - @last_cron_minute == 59) - # - # only consider cron entries at the second 0 of a - # minute + # + # cron jobs - @last_cron_minute = minute + if now.sec != @last_cron_second - #puts "step() @cron_entries.size #{@cron_entries.size}" + @last_cron_second = now.sec - @cron_entries.each do |cron_id, cron_entry| + #puts "step() @cron_jobs.size #{@cron_jobs.size}" + + @cron_jobs.each do |cron_id, cron_job| #puts "step() cron_id : #{cron_id}" - trigger(cron_entry) if cron_entry.matches? now + trigger(cron_job) if cron_job.matches?(now) end end # # pending jobs @@ -629,21 +712,30 @@ #if job.at <= now # # obviously - trigger(job) + trigger job @pending_jobs.delete_at(0) end end end - def trigger (entry) + # + # Triggers the job (in a dedicated thread). + # + # If an error occurs in the job, it well get caught and an error + # message will be displayed to STDOUT. + # If this scheduler provides a lwarn(message) method, it will + # be used insted. + # + def trigger (job) + Thread.new do begin - entry.trigger + job.trigger rescue Exception => e message = "trigger() caught exception\n" + OpenWFE::exception_to_s(e) if self.respond_to? :lwarn @@ -672,160 +764,296 @@ end protected JOB_ID_LOCK = Monitor.new + # + # would it be better to use a Mutex instead of a full-blown + # Monitor ? - class Entry + # + # The parent class for scheduled jobs. + # + class Job @@last_given_id = 0 # # as a scheduler is fully transient, no need to # have persistent ids, a simple counter is sufficient - attr_accessor \ - :eid, :block + # + # The identifier for the job + # + attr_accessor :job_id - def initialize (entry_id=nil, &block) + # + # An array of tags + # + attr_accessor :tags + + # + # The block to execute at trigger time + # + attr_accessor :block + + # + # A reference to the scheduler + # + attr_reader :scheduler + + # + # Keeping a copy of the initialization params of the job. + # + attr_reader :params + + + def initialize (scheduler, job_id, params, &block) + + @scheduler = scheduler @block = block - if entry_id - @eid = entry_id + + if job_id + @job_id = job_id else JOB_ID_LOCK.synchronize do - @eid = @@last_given_id - @@last_given_id = @eid + 1 + @job_id = @@last_given_id + @@last_given_id = @job_id + 1 end end + + @params = params + + #@tags = Array(tags).collect { |tag| tag.to_s } + # making sure we have an array of String tags + + @tags = Array(params[:tags]) + # any tag is OK end - #def trigger - # @block.call @eid - #end + # + # Returns true if this job sports the given tag + # + def has_tag? (tag) + + @tags.include?(tag) + end + + # + # Removes (cancels) this job from its scheduler. + # + def unschedule + + @scheduler.unschedule(@job_id) + end end - class AtEntry < Entry + # + # An 'at' job. + class AtJob < Job - attr_accessor \ - :at + # + # The float representation (Time.to_f) of the time at which + # the job should be triggered. + # + attr_accessor :at - def initialize (at, at_id, &block) - super(at_id, &block) + # + # The constructor. + # + def initialize (scheduler, at, at_id, params, &block) + + super(scheduler, at_id, params, &block) @at = at end + # + # Triggers the job (calls the block) + # def trigger - @block.call @eid, @at + + @block.call @job_id, @at end + + # + # Returns the Time instance at which this job is scheduled. + # + def schedule_info + + Time.at(@at) + end end - class EveryEntry < AtEntry + # + # An 'every' job is simply an extension of an 'at' job. + # + class EveryJob < AtJob + + # + # Returns the frequency string used to schedule this EveryJob, + # like for example "3d" or "1M10d3h". + # + def schedule_info + + @params[:every] + end end - class CronEntry < Entry + # + # A cron job. + # + class CronJob < Job - attr_accessor \ - :cron_line + # + # The CronLine instance representing the times at which + # the cron job has to be triggered. + # + attr_accessor :cron_line - def initialize (cron_id, line, &block) + def initialize (scheduler, cron_id, line, params, &block) - super(cron_id, &block) + super(scheduler, cron_id, params, &block) - if line.kind_of?(String) + if line.is_a?(String) + @cron_line = CronLine.new(line) - elsif line.kind_of?(CronLine) + + elsif line.is_a?(CronLine) + @cron_line = line + else + raise \ - "Cannot initialize a CronEntry " + + "Cannot initialize a CronJob " + "with a param of class #{line.class}" end end + # + # This is the method called by the scheduler to determine if it + # has to fire this CronJob instance. + # def matches? (time) + @cron_line.matches? time end + # + # As the name implies. + # def trigger - @block.call @eid, @cron_line + + @block.call @job_id, @cron_line end + + # + # Returns the original cron tab string used to schedule this + # Job. Like for example "60/3 * * * Sun". + # + def schedule_info + + @cron_line.original + end end # # A 'cron line' is a line in the sense of a crontab # (man 5 crontab) file line. # class CronLine + # + # The string used for creating this cronline instance. + # + attr_reader :original + attr_reader \ + :seconds, :minutes, :hours, :days, :months, :weekdays def initialize (line) super() + @original = line + items = line.split - if items.length != 5 + unless [ 5, 6 ].include?(items.length) raise \ - "cron '#{line}' string should hold 5 items, " + + "cron '#{line}' string should hold 5 or 6 items, " + "not #{items.length}" \ end - @minutes = parse_item(items[0], 0, 59) - @hours = parse_item(items[1], 0, 24) - @days = parse_item(items[2], 1, 31) - @months = parse_item(items[3], 1, 12) - @weekdays = parse_weekdays(items[4]) + offset = items.length - 5 - adjust_arrays() + @seconds = if offset == 1 + parse_item(items[0], 0, 59) + else + [ 0 ] + end + @minutes = parse_item(items[0+offset], 0, 59) + @hours = parse_item(items[1+offset], 0, 24) + @days = parse_item(items[2+offset], 1, 31) + @months = parse_item(items[3+offset], 1, 12) + @weekdays = parse_weekdays(items[4+offset]) + + #adjust_arrays() end + # + # Returns true if the given time matches this cron line. + # def matches? (time) if time.kind_of?(Float) or time.kind_of?(Integer) time = Time.at(time) end + return false if no_match?(time.sec, @seconds) return false if no_match?(time.min, @minutes) return false if no_match?(time.hour, @hours) return false if no_match?(time.day, @days) return false if no_match?(time.month, @months) return false if no_match?(time.wday, @weekdays) - return true + true end # - # Returns an array of 5 arrays (minutes, hours, days, months, - # weekdays). + # Returns an array of 6 arrays (seconds, minutes, hours, days, + # months, weekdays). # This method is used by the cronline unit tests. # def to_array - [ @minutes, @hours, @days, @months, @weekdays ] + [ @seconds, @minutes, @hours, @days, @months, @weekdays ] end private - # + #-- # adjust values to Ruby # - def adjust_arrays() - if @hours - @hours.each do |h| - h = 0 if h == 23 - end - end - if @weekdays - @weekdays.each do |wd| - wd = wd - 1 - end - end - end + #def adjust_arrays() + # @hours = @hours.collect { |h| + # if h == 24 + # 0 + # else + # h + # end + # } if @hours + # @weekdays = @weekdays.collect { |wd| + # wd - 1 + # } if @weekdays + #end + # + # dead code, keeping as a reminder + #++ WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ] # # used by parse_weekday() @@ -835,11 +1063,11 @@ WDS.each_with_index do |day, index| item = item.gsub(day, "#{index+1}") end - return parse_item(item, 1, 7) + parse_item(item, 1, 7) end def parse_item (item, min, max) return nil \ @@ -852,26 +1080,28 @@ i = Integer(item) i = min if i < min i = max if i > max - return [ i ] + [ i ] end def parse_list (item, min, max) + items = item.split(",") result = [] items.each do |i| i = Integer(i) i = min if i < min i = max if i > max result << i end - return result + result end def parse_range (item, min, max) + i = item.index("-") j = item.index("/") inc = 1 @@ -889,10 +1119,11 @@ else iend = Integer(item[i+1..-1]) end else # case */x + istart = min iend = max end istart = min if istart < min @@ -900,26 +1131,27 @@ result = [] value = istart while true + result << value value = value + inc break if value > iend end - return result + result end def no_match? (value, cron_values) return false if not cron_values cron_values.each do |v| return false if value == v end - return true + true end end end