lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1172 vs lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.16
- old
+ new
@@ -296,43 +296,43 @@
#
# Schedules a job in a loop. After an execution, it will not execute
# before the time specified in 'freq'.
#
- # Note that if your job takes 2s to execute and the freq is set to
- # 10s, it will in fact execute every 12s.
- # You can however wrap the code within its own thread :
- #
- # scheduler.schedule_every("12s") do
- # Thread.new do
- # do_the_job()
- # end
- # end
- #
# This method returns a job identifier which can be used to unschedule()
# the job.
#
def schedule_every (freq, params={}, &block)
f = duration_to_f freq
params = prepare_params params
schedulable = params[:schedulable]
- params[:every] = true
+ params[:every] = freq
- sschedule_at Time.new.to_f + f, params do |job_id, at|
+ last_at = params[:last_at]
+ next_at = if last_at
+ last_at + f
+ else
+ Time.now.to_f + f
+ end
- params[:job_id] = job_id
+ sschedule_at next_at, params do |job_id, at|
if schedulable
schedulable.trigger(params)
else
block.call job_id, at
end
+
+ params[:job_id] = job_id
+ params[:last_at] = at
- schedule_every(f, params, &block) \
+ schedule_every(freq, params, &block) \
unless @dont_reschedule_every
+ #
+ # yes, this is a kind of recursion
job_id
end
end
@@ -403,17 +403,15 @@
cron_id = params[:cron_id]
cron_id = params[:job_id] unless cron_id
unschedule(cron_id) if cron_id
- tags = params[:tags]
-
#
# schedule
b = to_block(params, &block)
- job = CronJob.new(self, cron_id, cron_line, tags, &b)
+ job = CronJob.new(self, cron_id, cron_line, params, &b)
@cron_jobs[job.job_id] = job
job.job_id
end
end
@@ -425,12 +423,14 @@
def get_job (job_id)
job = @cron_jobs[job_id]
return job if job
- @pending_jobs.find do |job|
- job.job_id == job_id
+ synchronize do
+ @pending_jobs.find do |job|
+ job.job_id == job_id
+ end
end
end
#
# Finds a job (via get_job()) and then returns the wrapped
@@ -454,12 +454,14 @@
result = @cron_jobs.values.find_all do |job|
job.has_tag?(tag)
end
- result + @pending_jobs.find_all do |job|
- job.has_tag?(tag)
+ synchronize do
+ result + @pending_jobs.find_all do |job|
+ job.has_tag?(tag)
+ end
end
end
#
# Finds the jobs with the given tag and then returns an array of
@@ -552,15 +554,15 @@
else
AtJob
end
job_id = params[:job_id]
- tags = params[:tags]
b = to_block(params, &block)
- job = jobClass.new(self, at, job_id, tags, &b)
+ job = jobClass.new(self, at, job_id, params, &b)
+
unschedule(job_id) if job_id
if at < (Time.new.to_f + @precision)
job.trigger() unless params[:discard_past]
return nil
@@ -668,10 +670,12 @@
@dont_reschedule_every = true if at_job_count < 1
end
# TODO : eventually consider running cron / pending
# job triggering in two different threads
+ #
+ # but well... there's the synchronization issue...
#
# cron jobs
if now.sec != @last_cron_second
@@ -708,17 +712,25 @@
#if job.at <= now
#
# obviously
- trigger(job)
+ trigger job
@pending_jobs.delete_at(0)
end
end
end
+ #
+ # Triggers the job (in a dedicated thread).
+ #
+ # If an error occurs in the job, it well get caught and an error
+ # message will be displayed to STDOUT.
+ # If this scheduler provides a lwarn(message) method, it will
+ # be used insted.
+ #
def trigger (job)
Thread.new do
begin
job.trigger
@@ -756,22 +768,48 @@
JOB_ID_LOCK = Monitor.new
#
# would it be better to use a Mutex instead of a full-blown
# Monitor ?
+ #
+ # The parent class for scheduled jobs.
+ #
class Job
@@last_given_id = 0
#
# as a scheduler is fully transient, no need to
# have persistent ids, a simple counter is sufficient
- attr_accessor :job_id, :tags, :block
+ #
+ # The identifier for the job
+ #
+ attr_accessor :job_id
+
+ #
+ # An array of tags
+ #
+ attr_accessor :tags
+
+ #
+ # The block to execute at trigger time
+ #
+ attr_accessor :block
+
+ #
+ # A reference to the scheduler
+ #
attr_reader :scheduler
+
+ #
+ # Keeping a copy of the initialization params of the job.
+ #
+ attr_reader :params
- def initialize (scheduler, job_id, tags, &block)
+ def initialize (scheduler, job_id, params, &block)
+
@scheduler = scheduler
@block = block
if job_id
@job_id = job_id
@@ -780,83 +818,156 @@
@job_id = @@last_given_id
@@last_given_id = @job_id + 1
end
end
+ @params = params
+
#@tags = Array(tags).collect { |tag| tag.to_s }
# making sure we have an array of String tags
- @tags = Array(tags)
+ @tags = Array(params[:tags])
# any tag is OK
end
#
# Returns true if this job sports the given tag
#
def has_tag? (tag)
+
@tags.include?(tag)
end
#
# Removes (cancels) this job from its scheduler.
#
def unschedule
+
@scheduler.unschedule(@job_id)
end
end
+ #
+ # An 'at' job.
class AtJob < Job
+ #
+ # The float representation (Time.to_f) of the time at which
+ # the job should be triggered.
+ #
attr_accessor :at
- def initialize (scheduler, at, at_id, tags, &block)
- super(scheduler, at_id, tags, &block)
+ #
+ # The constructor.
+ #
+ def initialize (scheduler, at, at_id, params, &block)
+
+ super(scheduler, at_id, params, &block)
@at = at
end
+ #
+ # Triggers the job (calls the block)
+ #
def trigger
+
@block.call @job_id, @at
end
+
+ #
+ # Returns the Time instance at which this job is scheduled.
+ #
+ def schedule_info
+
+ Time.at(@at)
+ end
end
+ #
+ # An 'every' job is simply an extension of an 'at' job.
+ #
class EveryJob < AtJob
+
+ #
+ # Returns the frequency string used to schedule this EveryJob,
+ # like for example "3d" or "1M10d3h".
+ #
+ def schedule_info
+
+ @params[:every]
+ end
end
+ #
+ # A cron job.
+ #
class CronJob < Job
+ #
+ # The CronLine instance representing the times at which
+ # the cron job has to be triggered.
+ #
attr_accessor :cron_line
- def initialize (scheduler, cron_id, line, tags, &block)
+ def initialize (scheduler, cron_id, line, params, &block)
- super(scheduler, cron_id, tags, &block)
+ super(scheduler, cron_id, params, &block)
- if line.kind_of?(String)
+ if line.is_a?(String)
+
@cron_line = CronLine.new(line)
- elsif line.kind_of?(CronLine)
+
+ elsif line.is_a?(CronLine)
+
@cron_line = line
+
else
+
raise \
"Cannot initialize a CronJob " +
"with a param of class #{line.class}"
end
end
+ #
+ # This is the method called by the scheduler to determine if it
+ # has to fire this CronJob instance.
+ #
def matches? (time)
+
@cron_line.matches? time
end
+ #
+ # As the name implies.
+ #
def trigger
+
@block.call @job_id, @cron_line
end
+
+ #
+ # Returns the original cron tab string used to schedule this
+ # Job. Like for example "60/3 * * * Sun".
+ #
+ def schedule_info
+
+ @cron_line.original
+ end
end
#
# A 'cron line' is a line in the sense of a crontab
# (man 5 crontab) file line.
#
class CronLine
+ #
+ # The string used for creating this cronline instance.
+ #
+ attr_reader :original
+
attr_reader \
:seconds,
:minutes,
:hours,
:days,
@@ -865,10 +976,12 @@
def initialize (line)
super()
+ @original = line
+
items = line.split
unless [ 5, 6 ].include?(items.length)
raise \
"cron '#{line}' string should hold 5 or 6 items, " +
@@ -889,10 +1002,13 @@
@weekdays = parse_weekdays(items[4+offset])
#adjust_arrays()
end
+ #
+ # Returns true if the given time matches this cron line.
+ #
def matches? (time)
if time.kind_of?(Float) or time.kind_of?(Integer)
time = Time.at(time)
end
@@ -916,11 +1032,11 @@
[ @seconds, @minutes, @hours, @days, @months, @weekdays ]
end
private
- #
+ #--
# adjust values to Ruby
#
#def adjust_arrays()
# @hours = @hours.collect { |h|
# if h == 24
@@ -932,10 +1048,11 @@
# @weekdays = @weekdays.collect { |wd|
# wd - 1
# } if @weekdays
#end
#
- # dead code
+ # dead code, keeping as a reminder
+ #++
WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ]
#
# used by parse_weekday()