lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.16 vs lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.16.1404
- old
+ new
@@ -51,11 +51,11 @@
#
# schedule_at() and schedule() await either a Schedulable instance and
# params (usually an array or nil), either a block, which is more in the
# Ruby way.
#
- # Some examples :
+ # == Examples
#
# scheduler.schedule_in("3d") do
# regenerate_monthly_report()
# end
# #
@@ -118,10 +118,12 @@
# #
# # instatiates a scheduler that checks its jobs twice per second
# # (the default is 4 times per second (0.250))
#
#
+ # == Tags
+ #
# Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :
#
# scheduler.schedule_in "2h", :tags => "backup" do
# init_backup_sequence()
# end
@@ -142,20 +144,71 @@
# The vanilla case for tags assume they are String instances, but nothing
# prevents you from using anything else. The scheduler has no persistence
# by itself, so no serialization issue.
#
#
+ # == Cron up to the second
+ #
# Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :
#
# scheduler.schedule "7 * * * * *" do
# puts "it's now the seventh second of the minute"
# end
#
# The OpenWFEru scheduler recognizes an optional first column for second
# scheduling. This column can, like for the other columns, specify a
# value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
#
+ # == Exceptions
+ #
+ # The OpenWFEru scheduler will output a stacktrace to the STDOUT in
+ # case of exception. There are two ways to change that behaviour.
+ #
+ # # 1 - providing a lwarn method to the scheduler instance :
+ #
+ # class << scheduler
+ # def lwarn (&block)
+ # puts "oops, something wrong happened : "
+ # puts block.call
+ # end
+ # end
+ #
+ # # 2 - overriding the [protected] method log_exception(e) :
+ #
+ # class << scheduler
+ # def log_exception (e)
+ # puts "something wrong happened : "+e.to_s
+ # end
+ # end
+ #
+ # == 'Every jobs' and rescheduling
+ #
+ # Every jobs can reschedule/unschedule themselves. A reschedule example :
+ #
+ # schedule.schedule_every "5h" do |job_id, at, params|
+ #
+ # mails = $inbox.fetch_mails
+ # mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
+ #
+ # params[:every] = if mails.size > 100
+ # "1h" # lots of spam, check every hour
+ # else
+ # "5h" # normal schedule, every 5 hours
+ # end
+ # end
+ #
+ # Unschedule example :
+ #
+ # schedule.schedule_every "10s" do |job_id, at, params|
+ # #
+ # # polls every 10 seconds until a mail arrives
+ #
+ # $mail = $inbox.fetch_last_mail
+ #
+ # params[:dont_reschedule] = true if $mail
+ # end
+ #
class Scheduler
include MonitorMixin
#
# By default, the precision is 0.250, with means the scheduler
@@ -209,11 +262,11 @@
java.lang.Thread.current_thread.name = \
"openwferu scheduler (Ruby Thread)"
end
- while true
+ loop do
break if @stopped
step
sleep @precision
end
end
@@ -299,10 +352,18 @@
# before the time specified in 'freq'.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
+ # In case of exception in the job, it will be rescheduled. If you don't
+ # want the job to be rescheduled, set the parameter :try_again to false.
+ #
+ # scheduler.schedule_every "500", :try_again => false do
+ # do_some_prone_to_error_stuff()
+ # # won't get rescheduled in base of exception
+ # end
+ #
def schedule_every (freq, params={}, &block)
f = duration_to_f freq
params = prepare_params params
@@ -316,24 +377,62 @@
Time.now.to_f + f
end
sschedule_at next_at, params do |job_id, at|
- if schedulable
- schedulable.trigger(params)
- else
- block.call job_id, at
+ #
+ # trigger ...
+
+ hit_exception = false
+
+ begin
+
+ if schedulable
+ schedulable.trigger params
+ else
+ block.call job_id, at, params
+ end
+
+ rescue Exception => e
+
+ log_exception e
+
+ hit_exception = true
end
- params[:job_id] = job_id
- params[:last_at] = at
-
- schedule_every(freq, params, &block) \
- unless @dont_reschedule_every
+ # cannot use a return here !!! (block)
+
+ #unless (hit_exception and params[:try_again] == false)
+ # #
+ # # reschedule ...
+ # params[:job_id] = job_id
+ # params[:last_at] = at
+ #
+ # schedule_every(params[:every], params, &block) \
+ # unless @dont_reschedule_every
+ # #
+ # # yes, this is a kind of recursion
+ #end
+ unless \
+ @dont_reschedule_every or
+ (params[:dont_reschedule] == true) or
+ (hit_exception and params[:try_again] == false)
+
+ #
+ # ok, reschedule ...
+
+ params[:job_id] = job_id
+ params[:last_at] = at
+
+ schedule_every params[:every], params, &block
#
# yes, this is a kind of recursion
+ # note that params[:every] might have been changed
+ # by the block/schedulable code
+ end
+
job_id
end
end
#
@@ -522,10 +621,11 @@
#
# Making sure that params is a Hash.
#
def prepare_params (params)
+
params = { :schedulable => params } \
if params.is_a?(Schedulable)
params
end
@@ -593,10 +693,11 @@
# duration_to_f("10s")
#
# will yields 10.0
#
def duration_to_f (s)
+
return s if s.kind_of? Float
return OpenWFE::parse_time_string(s) if s.kind_of? String
Float(s.to_s)
end
@@ -695,11 +796,11 @@
now = now.to_f
#
# that's what at jobs do understand
- while true
+ loop do
#puts "step() job.count is #{@pending_jobs.length}"
break if @pending_jobs.length < 1
@@ -714,40 +815,53 @@
#
# obviously
trigger job
- @pending_jobs.delete_at(0)
+ @pending_jobs.delete_at 0
end
end
end
#
# Triggers the job (in a dedicated thread).
#
- # If an error occurs in the job, it well get caught and an error
- # message will be displayed to STDOUT.
- # If this scheduler provides a lwarn(message) method, it will
- # be used insted.
- #
def trigger (job)
Thread.new do
begin
+
job.trigger
+
rescue Exception => e
- message =
- "trigger() caught exception\n" +
- OpenWFE::exception_to_s(e)
- if self.respond_to? :lwarn
- lwarn { message }
- else
- puts message
- end
+
+ log_exception e
end
end
end
+
+ #
+ # If an error occurs in the job, it well get caught and an error
+ # message will be displayed to STDOUT.
+ # If this scheduler provides a lwarn(message) method, it will
+ # be used insted.
+ #
+ # Of course, one can override this method.
+ #
+ def log_exception (e)
+
+ message =
+ "trigger() caught exception\n" +
+ e.to_s + "\n" +
+ e.backtrace.join("\n")
+
+ if self.respond_to?(:lwarn)
+ lwarn { message }
+ else
+ puts message
+ end
+ end
end
#
# This module adds a trigger method to any class that includes it.
# The default implementation feature here triggers an exception.
@@ -1007,13 +1121,12 @@
#
# Returns true if the given time matches this cron line.
#
def matches? (time)
- if time.kind_of?(Float) or time.kind_of?(Integer)
- time = Time.at(time)
- end
+ time = Time.at(time) \
+ if time.kind_of?(Float) or time.kind_of?(Integer)
return false if no_match?(time.sec, @seconds)
return false if no_match?(time.min, @minutes)
return false if no_match?(time.hour, @hours)
return false if no_match?(time.day, @days)
@@ -1130,10 +1243,10 @@
iend = max if iend > max
result = []
value = istart
- while true
+ loop do
result << value
value = value + inc
break if value > iend
end