lib/openwfe/util/scheduler.rb in openwferu-0.9.7 vs lib/openwfe/util/scheduler.rb in openwferu-0.9.8
- old
+ new
@@ -89,15 +89,24 @@
#
# scheduler.schedule_every("1h20m") do
# regenerate_latest_report()
# end
#
+ # The scheduler has a "exit_when_no_more_jobs" attribute. When set to
+ # 'true', the scheduler will exit as soon as there are no more jobs to
+ # run.
+ # Use with care though, if you create a scheduler, set this attribute
+ # to true and start the scheduler, the scheduler will immediately exit.
+ # This attribute is best used indirectly : the method
+ # join_until_no_more_jobs() wraps it.
+ #
class Scheduler
include MonitorMixin
attr_accessor \
- :precision
+ :precision,
+ :exit_when_no_more_jobs
def initialize
super()
@@ -108,10 +117,13 @@
@precision = 0.250
#
# every 250ms, the scheduler wakes up
+ @exit_when_no_more_jobs = false
+ @dont_reschedule_every = false
+
@last_cron_minute = -1
@stopped = false
end
@@ -121,90 +133,59 @@
def sstart
@scheduler_thread = Thread.new do
while true
break if @stopped
- #print "."; $stdout.flush
step
sleep(@precision)
end
end
end
#
# The scheduler is stoppable via sstop()
#
def sstop
+
@stopped = true
end
alias :start :sstart
alias :stop :sstop
#
# Joins on the scheduler thread
#
def join
+
@scheduler_thread.join
end
#
- # Schedules a job by specifying at which time it should trigger.
- # Returns the a job_id that can be used to unschedule the job.
+ # Like join() but takes care of setting the 'exit_when_no_more_jobs'
+ # attribute of this scheduler to true before joining.
+ # Thus the scheduler will exit (and the join terminates) as soon as
+ # there aren't no more 'at' (or 'every') jobs in the scheduler.
#
- def schedule_at (at, schedulable=nil, params=nil, &block)
+ # Currently used only in unit tests.
+ #
+ def join_until_no_more_jobs
- schedule_at_with_id(at, nil, schedulable, params, &block)
+ @exit_when_no_more_jobs = true
+ join
end
#
- # Do not call directly.
+ # Schedules a job by specifying at which time it should trigger.
+ # Returns the a job_id that can be used to unschedule the job.
#
- def schedule_at_with_id (at, at_id, schedulable=nil, params=nil, &block)
- synchronize do
+ def schedule_at (at, schedulable=nil, params=nil, &block)
- #puts "0 at is '#{at.to_s}' (#{at.class})"
-
- at = OpenWFE::to_ruby_time(at) \
- if at.kind_of? String
-
- at = OpenWFE::to_gm_time(at) \
- if at.kind_of? DateTime
-
- at = at.to_f \
- if at.kind_of? Time
-
- #puts "1 at is '#{at.to_s}' (#{at.class})"}"
-
- b = to_block(schedulable, params, &block)
- job = AtEntry.new(at, at_id, &b)
-
- if at < (Time.new.to_f + @precision)
- job.trigger()
- return nil
- end
-
- return push(job) \
- if @pending_jobs.length < 1
-
- # shortcut : check if the new job is posterior to
- # the last job pending
-
- return push(job) \
- if at >= @pending_jobs.last.at
-
- for i in 0...@pending_jobs.length
- if at <= @pending_jobs[i].at
- return push(job, i)
- end
- end
-
- return push(job)
- end
+ sschedule_at(false, at, nil, schedulable, params, &block)
end
- protected :schedule_at_with_id
+
#
# Schedules a job by stating in how much time it should trigger.
# Returns the a job_id that can be used to unschedule the job.
#
def schedule_in (duration, schedulable=nil, params=nil, &block)
@@ -229,34 +210,14 @@
# end
# end
#
def schedule_every (freq, schedulable=nil, params=nil, &block)
- schedule_every_with_id(freq, nil, schedulable, params, &block)
+ sschedule_every(freq, nil, schedulable, params, &block)
end
#
- # Do not call directly.
- #
- def schedule_every_with_id (freq, at_id, schedulable, params, &block)
-
- f = duration_to_f(freq)
-
- job_id = schedule_at_with_id(Time.new.to_f + f, at_id) do |eid, at|
- if schedulable
- schedulable.trigger(params)
- else
- block.call eid, at
- end
- schedule_every_with_id(f, eid, schedulable, params, &block)
- end
-
- job_id
- end
- protected :schedule_every_with_id
-
- #
# Unschedules an 'at' or a 'cron' job identified by the id
# it was given at schedule time.
#
def unschedule (job_id)
synchronize do
@@ -315,16 +276,14 @@
synchronize do
#
# is a job with the same id already scheduled ?
- if cron_id
- if unschedule(cron_id)
- ldebug do
- "schedule() unscheduled previous job "+
- "under same name '#{cron_id}'"
- end
+ if cron_id and unschedule(cron_id)
+ ldebug do
+ "schedule() unscheduled previous job "+
+ "under same name '#{cron_id}'"
end
end
#
# schedule
@@ -366,11 +325,12 @@
return j.schedulable if j.respond_to? :schedulable
return nil
end
#
- # Returns the number of currently pending jobs in this scheduler.
+ # Returns the number of currently pending jobs in this scheduler
+ # ('at' jobs and 'every' jobs).
#
def pending_job_count
@pending_jobs.size
end
@@ -380,18 +340,100 @@
def cron_job_count
@cron_entries.size
end
#
+ # Returns the current count of 'every' jobs scheduled.
+ #
+ def every_job_count
+ @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size
+ end
+
+ #
+ # Returns the current count of 'at' jobs scheduled (not 'every').
+ #
+ def at_job_count
+ @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size
+ end
+
+ #
# Returns true if the given string seems to be a cron string.
#
def Scheduler.is_cron_string (s)
return s.match(".+ .+ .+ .+ .+")
end
protected
+ def sschedule_at (
+ is_every, at, at_id, schedulable=nil, params=nil, &block)
+
+ synchronize do
+
+ #puts "0 at is '#{at.to_s}' (#{at.class})"
+
+ at = OpenWFE::to_ruby_time(at) \
+ if at.kind_of? String
+
+ at = OpenWFE::to_gm_time(at) \
+ if at.kind_of? DateTime
+
+ at = at.to_f \
+ if at.kind_of? Time
+
+ #puts "1 at is '#{at.to_s}' (#{at.class})"}"
+
+ jobClass = AtEntry
+ jobClass = EveryEntry if is_every
+
+ b = to_block(schedulable, params, &block)
+ job = jobClass.new(at, at_id, &b)
+
+ if at < (Time.new.to_f + @precision)
+ job.trigger()
+ return nil
+ end
+
+ return push(job) \
+ if @pending_jobs.length < 1
+
+ # shortcut : check if the new job is posterior to
+ # the last job pending
+
+ return push(job) \
+ if at >= @pending_jobs.last.at
+
+ for i in 0...@pending_jobs.length
+ if at <= @pending_jobs[i].at
+ return push(job, i)
+ end
+ end
+
+ return push(job)
+ end
+ end
+
+ def sschedule_every (freq, at_id, schedulable, params, &block)
+
+ f = duration_to_f(freq)
+
+ job_id = sschedule_at(
+ true, Time.new.to_f + f, at_id) do |eid, at|
+
+ if schedulable
+ schedulable.trigger(params)
+ else
+ block.call eid, at
+ end
+
+ sschedule_every(f, eid, schedulable, params, &block) \
+ unless @dont_reschedule_every
+ end
+
+ job_id
+ end
+
#
# Ensures that a duration is a expressed as a Float instance.
#
# duration_to_f("10s")
#
@@ -446,13 +488,25 @@
# determine if there are jobs to trigger else to get back to sleep.
# 'cron' jobs get executed if necessary then 'at' jobs.
#
def step
synchronize do
+
now = Time.new
minute = now.min
+ if @exit_when_no_more_jobs
+
+ if @pending_jobs.size < 1
+
+ @stopped = true
+ return
+ end
+
+ @dont_reschedule_every = true if at_job_count < 1
+ end
+
#
# cron entries
if now.sec == 0 and minute > @last_cron_minute
#
@@ -573,9 +627,12 @@
end
def trigger
@block.call @eid, @at
end
+ end
+
+ class EveryEntry < AtEntry
end
class CronEntry < Entry
attr_accessor \