lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1110 vs lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1127

- old
+ new

@@ -1,6 +1,6 @@ -# + #-- # Copyright (c) 2006-2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -100,19 +100,41 @@ # Use with care though, if you create a scheduler, set this attribute # to true and start the scheduler, the scheduler will immediately exit. # This attribute is best used indirectly : the method # join_until_no_more_jobs() wraps it. # - # Since OpenWFEru 0.9.13, the :scheduler_precision can be set when - # instantiating the scheduler. + # The :scheduler_precision can be set when instantiating the scheduler. # # scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500) # scheduler.start # # # # instatiates a scheduler that checks its jobs twice per second # # (the default is 4 times per second (0.250)) # + # + # Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled : + # + # scheduler.schedule_in "2h", :tags => "backup" do + # init_backup_sequence() + # end + # + # scheduler.schedule "0 24 * * *", :tags => "new_day" do + # do_this_or_that() + # end + # + # jobs = find_jobs 'backup' + # + # Multiple tags may be attached to a single job : + # + # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do + # init_backup_sequence() + # end + # + # The vanilla case for tags assume they are String instances, but nothing + # prevents you from using anything else. The scheduler has no persistence + # by itself, so no serialization issue. + # class Scheduler include MonitorMixin # # By default, the precision is 0.250, with means the scheduler @@ -129,11 +151,11 @@ def initialize (params={}) super() @pending_jobs = [] - @cron_entries = {} + @cron_jobs = {} @scheduler_thread = nil @precision = 0.250 # every 250ms, the scheduler wakes up (default value) @@ -159,12 +181,15 @@ @stopped = false @scheduler_thread = Thread.new do if defined?(JRUBY_VERSION) + require 'java' - java.lang.Thread.current_thread.name = "openwferu scheduler (Ruby Thread)" + + java.lang.Thread.current_thread.name = \ + "openwferu scheduler (Ruby Thread)" end while true break if @stopped step @@ -296,11 +321,11 @@ # def unschedule (job_id) synchronize do for i in 0...@pending_jobs.length - if @pending_jobs[i].eid == job_id + if @pending_jobs[i].job_id == job_id @pending_jobs.delete_at i return true end end @@ -311,12 +336,12 @@ # # Unschedules a cron job # def unschedule_cron_job (job_id) synchronize do - if @cron_entries.has_key?(job_id) - @cron_entries.delete job_id + if @cron_jobs.has_key?(job_id) + @cron_jobs.delete job_id return true end false end end @@ -357,32 +382,34 @@ cron_id = params[:cron_id] cron_id = params[:job_id] unless cron_id unschedule(cron_id) if cron_id + tags = params[:tags] + # # schedule b = to_block(params, &block) - entry = CronEntry.new(cron_id, cron_line, &b) - @cron_entries[entry.eid] = entry + job = CronJob.new(cron_id, cron_line, tags, &b) + @cron_jobs[job.job_id] = job - entry.eid + job.job_id end end # - # Returns the job corresponding to job_id, an instance of AtEntry - # or CronEntry will be returned. + # Returns the job corresponding to job_id, an instance of AtJob + # or CronJob will be returned. # def get_job (job_id) - entry = @cron_entries[job_id] - return entry if entry + job = @cron_jobs[job_id] + return job if job - @pending_jobs.find do |entry| - entry.eid == job_id + @pending_jobs.find do |job| + job.job_id == job_id end end # # Finds a job (via get_job()) and then returns the wrapped @@ -392,16 +419,49 @@ return nil unless job_id j = get_job(job_id) - return j.schedulable if j.respond_to? :schedulable + return j.schedulable if j.respond_to?(:schedulable) nil end # + # Returns an array of jobs that have the given tag. + # + def find_jobs (tag) + + result = @cron_jobs.values.find_all do |job| + job.has_tag?(tag) + end + + result + @pending_jobs.find_all do |job| + job.has_tag?(tag) + end + end + + # + # Finds the jobs with the given tag and then returns an array of + # the wrapped Schedulable objects. + # Jobs that haven't a wrapped Schedulable won't be included in the + # result. + # + def find_schedulables (tag) + + jobs = find_jobs(tag) + + result = [] + + jobs.each do |job| + result.push(job.schedulable) if job.respond_to?(:schedulable) + end + + result + end + + # # Returns the number of currently pending jobs in this scheduler # ('at' jobs and 'every' jobs). # def pending_job_count @pending_jobs.size @@ -409,25 +469,25 @@ # # Returns the number of cron jobs currently active in this scheduler. # def cron_job_count - @cron_entries.size + @cron_jobs.size end # # Returns the current count of 'every' jobs scheduled. # def every_job_count - @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size + @pending_jobs.select { |j| j.is_a?(EveryJob) }.size end # # Returns the current count of 'at' jobs scheduled (not 'every'). # def at_job_count - @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size + @pending_jobs.select { |j| j.instance_of?(AtJob) }.size end # # Returns true if the given string seems to be a cron string. # @@ -465,19 +525,20 @@ if at.kind_of?(Time) #puts "1 at is '#{at.to_s}' (#{at.class})"}" jobClass = if params[:every] - EveryEntry + EveryJob else - AtEntry + AtJob end job_id = params[:job_id] + tags = params[:tags] b = to_block(params, &block) - job = jobClass.new(at, job_id, &b) + job = jobClass.new(at, job_id, tags, &b) unschedule(job_id) if job_id if at < (Time.new.to_f + @precision) job.trigger() unless params[:discard_past] @@ -559,11 +620,11 @@ @pending_jobs[index, 0] = job end #puts "push() at '#{Time.at(job.at)}'" - job.eid + job.job_id end # # This is the method called each time the scheduler wakes up # (by default 4 times per second). It's meant to quickly @@ -585,24 +646,24 @@ @dont_reschedule_every = true if at_job_count < 1 end # - # cron entries + # cron jobs if now.sec == 0 and now.min != @last_cron_minute # - # only consider cron entries at the second 0 of a + # only consider cron jobs at the second 0 of a # minute @last_cron_minute = now.min - #puts "step() @cron_entries.size #{@cron_entries.size}" + #puts "step() @cron_jobs.size #{@cron_jobs.size}" - @cron_entries.each do |cron_id, cron_entry| + @cron_jobs.each do |cron_id, cron_job| #puts "step() cron_id : #{cron_id}" - trigger(cron_entry) if cron_entry.matches?(now) + trigger(cron_job) if cron_job.matches?(now) end end # # pending jobs @@ -633,15 +694,15 @@ @pending_jobs.delete_at(0) end end end - def trigger (entry) + def trigger (job) Thread.new do begin - entry.trigger + job.trigger rescue Exception => e message = "trigger() caught exception\n" + OpenWFE::exception_to_s(e) if self.respond_to? :lwarn @@ -670,82 +731,94 @@ end protected JOB_ID_LOCK = Monitor.new + # + # would it be better to use a Mutex instead of a full-blown + # Monitor ? - class Entry + class Job @@last_given_id = 0 # # as a scheduler is fully transient, no need to # have persistent ids, a simple counter is sufficient attr_accessor \ - :eid, :block + :job_id, :tags, :block + + def initialize (job_id, tags, &block) - def initialize (entry_id=nil, &block) @block = block - if entry_id - @eid = entry_id + + if job_id + @job_id = job_id else JOB_ID_LOCK.synchronize do - @eid = @@last_given_id - @@last_given_id = @eid + 1 + @job_id = @@last_given_id + @@last_given_id = @job_id + 1 end end + + #@tags = Array(tags).collect { |tag| tag.to_s } + # making sure we have an array of String tags + + @tags = Array(tags) + # any tag is OK end - #def trigger - # @block.call @eid - #end + # + # Returns true if this job sports the given tag + # + def has_tag? (tag) + @tags.include?(tag) + end end - class AtEntry < Entry + class AtJob < Job - attr_accessor \ - :at + attr_accessor :at - def initialize (at, at_id, &block) - super(at_id, &block) + def initialize (at, at_id, tags, &block) + super(at_id, tags, &block) @at = at end def trigger - @block.call @eid, @at + @block.call @job_id, @at end end - class EveryEntry < AtEntry + class EveryJob < AtJob end - class CronEntry < Entry + class CronJob < Job - attr_accessor \ - :cron_line + attr_accessor :cron_line - def initialize (cron_id, line, &block) + def initialize (cron_id, line, tags, &block) - super(cron_id, &block) + super(cron_id, tags, &block) if line.kind_of?(String) @cron_line = CronLine.new(line) elsif line.kind_of?(CronLine) @cron_line = line else raise \ - "Cannot initialize a CronEntry " + + "Cannot initialize a CronJob " + "with a param of class #{line.class}" end end def matches? (time) @cron_line.matches? time end def trigger - @block.call @eid, @cron_line + @block.call @job_id, @cron_line end end # # A 'cron line' is a line in the sense of a crontab @@ -791,11 +864,11 @@ return false if no_match?(time.hour, @hours) return false if no_match?(time.day, @days) return false if no_match?(time.month, @months) return false if no_match?(time.wday, @weekdays) - return true + true end # # Returns an array of 5 arrays (minutes, hours, days, months, # weekdays). @@ -833,11 +906,11 @@ WDS.each_with_index do |day, index| item = item.gsub(day, "#{index+1}") end - return parse_item(item, 1, 7) + parse_item(item, 1, 7) end def parse_item (item, min, max) return nil \ @@ -850,11 +923,11 @@ i = Integer(item) i = min if i < min i = max if i > max - return [ i ] + [ i ] end def parse_list (item, min, max) items = item.split(",") result = [] @@ -862,11 +935,11 @@ i = Integer(i) i = min if i < min i = max if i > max result << i end - return result + result end def parse_range (item, min, max) i = item.index("-") j = item.index("/") @@ -903,21 +976,21 @@ result << value value = value + inc break if value > iend end - return result + result end def no_match? (value, cron_values) return false if not cron_values cron_values.each do |v| return false if value == v end - return true + true end end end