lib/openwfe/util/scheduler.rb in openwferu-0.9.7 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.8

- old
+ new

@@ -89,15 +89,24 @@ # # scheduler.schedule_every("1h20m") do # regenerate_latest_report() # end # + # The scheduler has a "exit_when_no_more_jobs" attribute. When set to + # 'true', the scheduler will exit as soon as there are no more jobs to + # run. + # Use with care though, if you create a scheduler, set this attribute + # to true and start the scheduler, the scheduler will immediately exit. + # This attribute is best used indirectly : the method + # join_until_no_more_jobs() wraps it. + # class Scheduler include MonitorMixin attr_accessor \ - :precision + :precision, + :exit_when_no_more_jobs def initialize super() @@ -108,10 +117,13 @@ @precision = 0.250 # # every 250ms, the scheduler wakes up + @exit_when_no_more_jobs = false + @dont_reschedule_every = false + @last_cron_minute = -1 @stopped = false end @@ -121,90 +133,59 @@ def sstart @scheduler_thread = Thread.new do while true break if @stopped - #print "."; $stdout.flush step sleep(@precision) end end end # # The scheduler is stoppable via sstop() # def sstop + @stopped = true end alias :start :sstart alias :stop :sstop # # Joins on the scheduler thread # def join + @scheduler_thread.join end # - # Schedules a job by specifying at which time it should trigger. - # Returns the a job_id that can be used to unschedule the job. + # Like join() but takes care of setting the 'exit_when_no_more_jobs' + # attribute of this scheduler to true before joining. + # Thus the scheduler will exit (and the join terminates) as soon as + # there aren't no more 'at' (or 'every') jobs in the scheduler. # - def schedule_at (at, schedulable=nil, params=nil, &block) + # Currently used only in unit tests. + # + def join_until_no_more_jobs - schedule_at_with_id(at, nil, schedulable, params, &block) + @exit_when_no_more_jobs = true + join end # - # Do not call directly. + # Schedules a job by specifying at which time it should trigger. + # Returns the a job_id that can be used to unschedule the job. # - def schedule_at_with_id (at, at_id, schedulable=nil, params=nil, &block) - synchronize do + def schedule_at (at, schedulable=nil, params=nil, &block) - #puts "0 at is '#{at.to_s}' (#{at.class})" - - at = OpenWFE::to_ruby_time(at) \ - if at.kind_of? String - - at = OpenWFE::to_gm_time(at) \ - if at.kind_of? DateTime - - at = at.to_f \ - if at.kind_of? Time - - #puts "1 at is '#{at.to_s}' (#{at.class})"}" - - b = to_block(schedulable, params, &block) - job = AtEntry.new(at, at_id, &b) - - if at < (Time.new.to_f + @precision) - job.trigger() - return nil - end - - return push(job) \ - if @pending_jobs.length < 1 - - # shortcut : check if the new job is posterior to - # the last job pending - - return push(job) \ - if at >= @pending_jobs.last.at - - for i in 0...@pending_jobs.length - if at <= @pending_jobs[i].at - return push(job, i) - end - end - - return push(job) - end + sschedule_at(false, at, nil, schedulable, params, &block) end - protected :schedule_at_with_id + # # Schedules a job by stating in how much time it should trigger. # Returns the a job_id that can be used to unschedule the job. # def schedule_in (duration, schedulable=nil, params=nil, &block) @@ -229,34 +210,14 @@ # end # end # def schedule_every (freq, schedulable=nil, params=nil, &block) - schedule_every_with_id(freq, nil, schedulable, params, &block) + sschedule_every(freq, nil, schedulable, params, &block) end # - # Do not call directly. - # - def schedule_every_with_id (freq, at_id, schedulable, params, &block) - - f = duration_to_f(freq) - - job_id = schedule_at_with_id(Time.new.to_f + f, at_id) do |eid, at| - if schedulable - schedulable.trigger(params) - else - block.call eid, at - end - schedule_every_with_id(f, eid, schedulable, params, &block) - end - - job_id - end - protected :schedule_every_with_id - - # # Unschedules an 'at' or a 'cron' job identified by the id # it was given at schedule time. # def unschedule (job_id) synchronize do @@ -315,16 +276,14 @@ synchronize do # # is a job with the same id already scheduled ? - if cron_id - if unschedule(cron_id) - ldebug do - "schedule() unscheduled previous job "+ - "under same name '#{cron_id}'" - end + if cron_id and unschedule(cron_id) + ldebug do + "schedule() unscheduled previous job "+ + "under same name '#{cron_id}'" end end # # schedule @@ -366,11 +325,12 @@ return j.schedulable if j.respond_to? :schedulable return nil end # - # Returns the number of currently pending jobs in this scheduler. + # Returns the number of currently pending jobs in this scheduler + # ('at' jobs and 'every' jobs). # def pending_job_count @pending_jobs.size end @@ -380,18 +340,100 @@ def cron_job_count @cron_entries.size end # + # Returns the current count of 'every' jobs scheduled. + # + def every_job_count + @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size + end + + # + # Returns the current count of 'at' jobs scheduled (not 'every'). + # + def at_job_count + @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size + end + + # # Returns true if the given string seems to be a cron string. # def Scheduler.is_cron_string (s) return s.match(".+ .+ .+ .+ .+") end protected + def sschedule_at ( + is_every, at, at_id, schedulable=nil, params=nil, &block) + + synchronize do + + #puts "0 at is '#{at.to_s}' (#{at.class})" + + at = OpenWFE::to_ruby_time(at) \ + if at.kind_of? String + + at = OpenWFE::to_gm_time(at) \ + if at.kind_of? DateTime + + at = at.to_f \ + if at.kind_of? Time + + #puts "1 at is '#{at.to_s}' (#{at.class})"}" + + jobClass = AtEntry + jobClass = EveryEntry if is_every + + b = to_block(schedulable, params, &block) + job = jobClass.new(at, at_id, &b) + + if at < (Time.new.to_f + @precision) + job.trigger() + return nil + end + + return push(job) \ + if @pending_jobs.length < 1 + + # shortcut : check if the new job is posterior to + # the last job pending + + return push(job) \ + if at >= @pending_jobs.last.at + + for i in 0...@pending_jobs.length + if at <= @pending_jobs[i].at + return push(job, i) + end + end + + return push(job) + end + end + + def sschedule_every (freq, at_id, schedulable, params, &block) + + f = duration_to_f(freq) + + job_id = sschedule_at( + true, Time.new.to_f + f, at_id) do |eid, at| + + if schedulable + schedulable.trigger(params) + else + block.call eid, at + end + + sschedule_every(f, eid, schedulable, params, &block) \ + unless @dont_reschedule_every + end + + job_id + end + # # Ensures that a duration is a expressed as a Float instance. # # duration_to_f("10s") # @@ -446,13 +488,25 @@ # determine if there are jobs to trigger else to get back to sleep. # 'cron' jobs get executed if necessary then 'at' jobs. # def step synchronize do + now = Time.new minute = now.min + if @exit_when_no_more_jobs + + if @pending_jobs.size < 1 + + @stopped = true + return + end + + @dont_reschedule_every = true if at_job_count < 1 + end + # # cron entries if now.sec == 0 and minute > @last_cron_minute # @@ -573,9 +627,12 @@ end def trigger @block.call @eid, @at end + end + + class EveryEntry < AtEntry end class CronEntry < Entry attr_accessor \