lib/rufus/scheduler.rb in rufus-scheduler-1.0.7 vs lib/rufus/scheduler.rb in rufus-scheduler-1.0.8
- old
+ new
@@ -29,12 +29,12 @@
#
require 'thread'
require 'monitor'
require 'rufus/otime'
+require 'rufus/cronline'
-
module Rufus
#
# The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs.
# 'at' jobs to execute once at a given point in time. 'cron' jobs
@@ -43,11 +43,11 @@
#
# schedule_at() and schedule() await either a Schedulable instance and
# params (usually an array or nil), either a block, which is more in the
# Ruby way.
#
- # == The gem "openwferu-scheduler"
+ # == The gem "rufus-scheduler"
#
# This scheduler was previously known as the "openwferu-scheduler" gem.
#
# To ensure that code tapping the previous gem still runs fine with
# "rufus-scheduler", this new gem has 'pointers' for the old class
@@ -112,10 +112,13 @@
#
# scheduler.schedule_every("1h20m") do
# regenerate_latest_report()
# end
#
+ # (note : a schedule every isn't triggered immediately, thus this example
+ # will first trigger 1 hour and 20 minutes after being scheduled)
+ #
# The scheduler has a "exit_when_no_more_jobs" attribute. When set to
# 'true', the scheduler will exit as soon as there are no more jobs to
# run.
# Use with care though, if you create a scheduler, set this attribute
# to true and start the scheduler, the scheduler will immediately exit.
@@ -285,10 +288,24 @@
# You can specify the name of the scheduler's thread. Should make
# it easier in some debugging situations.
#
# scheduler.new :thread_name => "the crazy scheduler"
#
+ #
+ # == job.trigger_thread
+ #
+ # Since rufus-scheduler 1.0.8, you can have access to the thread of
+ # a job currently being triggered.
+ #
+ # job = scheduler.get_job(job_id)
+ # thread = job.trigger_thread
+ #
+ # This new method will return nil if the job is not currently being
+ # triggered. Not that in case of an every or cron job, this method
+ # will return the thread of the last triggered instance, thus, in case
+ # of overlapping executions, you only get the most recent thread.
+ #
class Scheduler
#
# By default, the precision is 0.250, with means the scheduler
# will check for jobs to execute 4 times per second.
@@ -322,10 +339,11 @@
super()
@pending_jobs = []
@cron_jobs = {}
+ @non_cron_jobs = {}
@schedule_queue = Queue.new
@unschedule_queue = Queue.new
#
# sync between the step() method and the [un]schedule
@@ -344,11 +362,11 @@
@thread_name = params[:thread_name] || "rufus scheduler"
#@correction = 0.00045
@exit_when_no_more_jobs = false
- @dont_reschedule_every = false
+ #@dont_reschedule_every = false
@last_cron_second = -1
@stopped = true
end
@@ -465,27 +483,37 @@
at,
prepare_params(params),
&block)
end
+ #
+ # a shortcut for schedule_at
+ #
+ alias :at :schedule_at
+
#
# Schedules a job by stating in how much time it should trigger.
# Returns the a job_id that can be used to unschedule the job.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
#
def schedule_in (duration, params={}, &block)
do_schedule_at(
- Time.new.to_f + duration_to_f(duration),
+ Time.new.to_f + Rufus::duration_to_f(duration),
prepare_params(params),
&block)
end
#
+ # a shortcut for schedule_in
+ #
+ alias :in :schedule_in
+
+ #
# Schedules a job in a loop. After an execution, it will not execute
# before the time specified in 'freq'.
#
# This method returns a job identifier which can be used to unschedule()
# the job.
@@ -503,81 +531,38 @@
#
# scheduler.schedule_every "2d", :first_in => "5h" do
# # schedule something every two days, start in 5 hours...
# end
#
+ # (without setting a :first_in (or :first_at), our example schedule would
+ # have had been triggered after two days).
+ #
def schedule_every (freq, params={}, &block)
- f = duration_to_f freq
-
params = prepare_params params
- schedulable = params[:schedulable]
params[:every] = freq
- first_at = params.delete :first_at
- first_in = params.delete :first_in
+ first_at = params[:first_at]
+ first_in = params[:first_in]
- previous_at = params[:previous_at]
-
- next_at = if first_at
- first_at
+ first_at = if first_at
+ at_to_f(first_at)
elsif first_in
- Time.now.to_f + duration_to_f(first_in)
- elsif previous_at
- previous_at + f
+ Time.now.to_f + Rufus.duration_to_f(first_in)
else
- Time.now.to_f + f
+ Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
end
- do_schedule_at(next_at, params) do |job_id, at|
-
- #
- # trigger ...
-
- hit_exception = false
-
- begin
-
- if schedulable
- schedulable.trigger params
- else
- block.call job_id, at, params
- end
-
- rescue Exception => e
-
- log_exception e
-
- hit_exception = true
- end
-
- # cannot use a return here !!! (block)
-
- unless \
- @dont_reschedule_every or
- (params[:dont_reschedule] == true) or
- (hit_exception and params[:try_again] == false)
-
- #
- # ok, reschedule ...
-
- params[:job_id] = job_id
- params[:previous_at] = at
-
- schedule_every params[:every], params, &block
- #
- # yes, this is a kind of recursion
-
- # note that params[:every] might have been changed
- # by the block/schedulable code
- end
-
- job_id
- end
+ do_schedule_at(first_at, params, &block)
end
#
+ # a shortcut for schedule_every
+ #
+ alias :every :schedule_every
+
+ #
# Schedules a cron job, the 'cron_line' is a string
# following the Unix cron standard (see "man 5 crontab" in your command
# line, or http://www.google.com/search?q=man%205%20crontab).
#
# For example :
@@ -605,28 +590,30 @@
params = prepare_params(params)
#
# is a job with the same id already scheduled ?
- cron_id = params[:cron_id]
- cron_id = params[:job_id] unless cron_id
+ cron_id = params[:cron_id] || params[:job_id]
- #unschedule(cron_id) if cron_id
- @unschedule_queue << [ :cron, cron_id ]
+ #@unschedule_queue << cron_id
#
# schedule
b = to_block(params, &block)
job = CronJob.new(self, cron_id, cron_line, params, &b)
- #@cron_jobs[job.job_id] = job
@schedule_queue << job
job.job_id
end
+ #
+ # an alias for schedule()
+ #
+ alias :cron :schedule
+
#--
#
# The UNscheduling methods
#
#++
@@ -635,19 +622,21 @@
# Unschedules an 'at' or a 'cron' job identified by the id
# it was given at schedule time.
#
def unschedule (job_id)
- @unschedule_queue << [ :at, job_id ]
+ @unschedule_queue << job_id
end
#
# Unschedules a cron job
#
+ # (deprecated : use unschedule(job_id) for all the jobs !)
+ #
def unschedule_cron_job (job_id)
- @unschedule_queue << [ :cron, job_id ]
+ unschedule(job_id)
end
#--
#
# 'query' methods
@@ -658,35 +647,30 @@
# Returns the job corresponding to job_id, an instance of AtJob
# or CronJob will be returned.
#
def get_job (job_id)
- @cron_jobs[job_id] || @pending_jobs.find { |job| job.job_id == job_id }
+ @cron_jobs[job_id] || @non_cron_jobs[job_id]
end
#
# Finds a job (via get_job()) and then returns the wrapped
# schedulable if any.
#
def get_schedulable (job_id)
- #return nil unless job_id
-
j = get_job(job_id)
-
- return j.schedulable if j.respond_to?(:schedulable)
-
- nil
+ j.respond_to?(:schedulable) ? j.schedulable : nil
end
#
# Returns an array of jobs that have the given tag.
#
def find_jobs (tag)
@cron_jobs.values.find_all { |job| job.has_tag?(tag) } +
- @pending_jobs.find_all { |job| job.has_tag?(tag) }
+ @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) }
end
#
# Finds the jobs with the given tag and then returns an array of
# the wrapped Schedulable objects.
@@ -718,116 +702,110 @@
#
# Returns the current count of 'every' jobs scheduled.
#
def every_job_count
- @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
+ @non_cron_jobs.values.select { |j| j.class == EveryJob }.size
end
#
# Returns the current count of 'at' jobs scheduled (not 'every').
#
def at_job_count
- @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
+ @non_cron_jobs.values.select { |j| j.class == AtJob }.size
end
#
# Returns true if the given string seems to be a cron string.
#
- def Scheduler.is_cron_string (s)
+ def self.is_cron_string (s)
s.match ".+ .+ .+ .+ .+" # well...
end
private
+ #
+ # the unschedule work itself.
+ #
def do_unschedule (job_id)
+ job = get_job job_id
+
+ return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob)
+
+ return false unless job # not found
+
+ if job.is_a?(AtJob) # catches AtJob and EveryJob instances
+ @non_cron_jobs.delete(job_id)
+ job.params[:dont_reschedule] = true # for AtJob as well, no worries
+ end
+
for i in 0...@pending_jobs.length
if @pending_jobs[i].job_id == job_id
@pending_jobs.delete_at i
- return true
+ return true # asap
end
end
- #
- # not using delete_if because it scans the whole list
- do_unschedule_cron_job job_id
+ true
end
- def do_unschedule_cron_job (job_id)
-
- (@cron_jobs.delete(job_id) != nil)
- end
-
#
# Making sure that params is a Hash.
#
def prepare_params (params)
- params = { :schedulable => params } \
- if params.is_a?(Schedulable)
- params
+ params.is_a?(Schedulable) ? { :schedulable => params } : params
end
#
# The core method behind schedule_at and schedule_in (and also
# schedule_every). It's protected, don't use it directly.
#
def do_schedule_at (at, params={}, &block)
- #puts "0 at is '#{at.to_s}' (#{at.class})"
+ job = params.delete :job
- at = at_to_f at
+ unless job
- #puts "1 at is '#{at.to_s}' (#{at.class})"}"
+ jobClass = params[:every] ? EveryJob : AtJob
- jobClass = params[:every] ? EveryJob : AtJob
+ b = to_block params, &block
- job_id = params[:job_id]
+ job = jobClass.new self, at_to_f(at), params[:job_id], params, &b
+ end
- b = to_block params, &block
+ if jobClass == AtJob && job.at < (Time.new.to_f + @precision)
- job = jobClass.new self, at, job_id, params, &b
+ job.trigger() unless params[:discard_past]
- #do_unschedule(job_id) if job_id
+ @non_cron_jobs.delete job.job_id # just to be sure
- if at < (Time.new.to_f + @precision)
-
- job.trigger() unless params[:discard_past]
return nil
end
+ @non_cron_jobs[job.job_id] = job
+
@schedule_queue << job
job.job_id
end
#
- # Ensures that a duration is a expressed as a Float instance.
- #
- # duration_to_f("10s")
- #
- # will yields 10.0
- #
- def duration_to_f (s)
-
- return s if s.kind_of?(Float)
- return Rufus::parse_time_string(s) if s.kind_of?(String)
- Float(s.to_s)
- end
-
- #
# Ensures an 'at' instance is translated to a float
# (to be compared with the float coming from time.to_f)
#
def at_to_f (at)
at = Rufus::to_ruby_time(at) if at.kind_of?(String)
at = Rufus::to_gm_time(at) if at.kind_of?(DateTime)
at = at.to_f if at.kind_of?(Time)
+
+ raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float)
+
at
end
#
# Returns a block. If a block is passed, will return it, else,
@@ -836,16 +814,14 @@
#
def to_block (params, &block)
return block if block
- schedulable = params[:schedulable]
+ schedulable = params.delete(:schedulable)
return nil unless schedulable
- params.delete :schedulable
-
l = lambda do
schedulable.trigger(params)
end
class << l
attr_accessor :schedulable
@@ -884,13 +860,10 @@
# determine if there are jobs to trigger else to get back to sleep.
# 'cron' jobs get executed if necessary then 'at' jobs.
#
def step
- #puts Time.now.to_f
- #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect
-
step_unschedule
# unschedules any job in the unschedule queue before
# they have a chance to get triggered.
step_trigger
@@ -909,19 +882,11 @@
loop do
break if @unschedule_queue.empty?
- type, job_id = @unschedule_queue.pop
-
- if type == :cron
-
- do_unschedule_cron_job job_id
- else
-
- do_unschedule job_id
- end
+ do_unschedule(@unschedule_queue.pop)
end
end
#
# adds every job waiting in the @schedule_queue to
@@ -945,26 +910,21 @@
end
end
end
#
- # triggers every eligible pending jobs, then every eligible
+ # triggers every eligible pending (at or every) jobs, then every eligible
# cron jobs.
#
def step_trigger
- now = Time.new
+ now = Time.now
- if @exit_when_no_more_jobs
+ if @exit_when_no_more_jobs && @pending_jobs.size < 1
- if @pending_jobs.size < 1
-
- @stopped = true
- return
- end
-
- @dont_reschedule_every = true if at_job_count < 1
+ @stopped = true
+ return
end
# TODO : eventually consider running cron / pending
# job triggering in two different threads
#
@@ -975,16 +935,13 @@
if now.sec != @last_cron_second
@last_cron_second = now.sec
- #puts "step() @cron_jobs.size #{@cron_jobs.size}"
-
@cron_jobs.each do |cron_id, cron_job|
- #puts "step() cron_id : #{cron_id}"
#trigger(cron_job) if cron_job.matches?(now, @precision)
- trigger(cron_job) if cron_job.matches?(now)
+ cron_job.trigger if cron_job.matches?(now)
end
end
#
# pending jobs
@@ -1003,34 +960,17 @@
#if job.at <= now
#
# obviously
- trigger job
+ job.trigger
@pending_jobs.delete_at 0
end
end
#
- # Triggers the job (in a dedicated thread).
- #
- def trigger (job)
-
- Thread.new do
- begin
-
- job.trigger
-
- rescue Exception => e
-
- log_exception e
- end
- end
- end
-
- #
# If an error occurs in the job, it well get caught and an error
# message will be displayed to STDOUT.
# If this scheduler provides a lwarn(message) method, it will
# be used insted.
#
@@ -1106,11 +1046,17 @@
#
# Keeping a copy of the initialization params of the job.
#
attr_reader :params
+ #
+ # if the job is currently executing, this field points to
+ # the 'trigger thread'
+ #
+ attr_reader :trigger_thread
+
def initialize (scheduler, job_id, params, &block)
@scheduler = scheduler
@block = block
@@ -1145,10 +1091,35 @@
#
def unschedule
@scheduler.unschedule(@job_id)
end
+
+ #
+ # Triggers the job (in a dedicated thread).
+ #
+ def trigger
+
+ Thread.new do
+
+ @trigger_thread = Thread.current
+ # keeping track of the thread
+
+ begin
+
+ do_trigger
+
+ rescue Exception => e
+
+ @scheduler.send(:log_exception, e)
+ end
+
+ #@trigger_thread = nil if @trigger_thread = Thread.current
+ @trigger_thread = nil
+ # overlapping executions, what to do ?
+ end
+ end
end
#
# An 'at' job.
#
@@ -1158,28 +1129,18 @@
# The float representation (Time.to_f) of the time at which
# the job should be triggered.
#
attr_accessor :at
- #
- # The constructor.
- #
+
def initialize (scheduler, at, at_id, params, &block)
super(scheduler, at_id, params, &block)
@at = at
end
#
- # Triggers the job (calls the block)
- #
- def trigger
-
- @block.call @job_id, @at
- end
-
- #
# Returns the Time instance at which this job is scheduled.
#
def schedule_info
Time.at(@at)
@@ -1191,10 +1152,22 @@
#
def next_time
schedule_info
end
+
+ protected
+
+ #
+ # Triggers the job (calls the block)
+ #
+ def do_trigger
+
+ @block.call @job_id, @at
+
+ @scheduler.instance_variable_get(:@non_cron_jobs).delete @job_id
+ end
end
#
# An 'every' job is simply an extension of an 'at' job.
#
@@ -1206,10 +1179,52 @@
#
def schedule_info
@params[:every]
end
+
+ protected
+
+ #
+ # triggers the job, then reschedules it if necessary
+ #
+ def do_trigger
+
+ hit_exception = false
+
+ begin
+
+ @block.call @job_id, @at, @params
+
+ rescue Exception => e
+
+ @scheduler.send(:log_exception, e)
+
+ hit_exception = true
+ end
+
+ if \
+ @scheduler.instance_variable_get(:@exit_when_no_more_jobs) or
+ (@params[:dont_reschedule] == true) or
+ (hit_exception and @params[:try_again] == false)
+
+ @scheduler.instance_variable_get(:@non_cron_jobs).delete(job_id)
+ # maybe it'd be better to wipe that reference from here anyway...
+
+ return
+ end
+
+ #
+ # ok, reschedule ...
+
+
+ params[:job] = self
+
+ @at = @at + Rufus.duration_to_f(params[:every])
+
+ @scheduler.send(:do_schedule_at, @at, params)
+ end
end
#
# A cron job.
#
@@ -1251,18 +1266,10 @@
#@cron_line.matches?(time, precision)
@cron_line.matches?(time)
end
#
- # As the name implies.
- #
- def trigger
-
- @block.call @job_id, @cron_line, @params
- end
-
- #
# Returns the original cron tab string used to schedule this
# Job. Like for example "60/3 * * * Sun".
#
def schedule_info
@@ -1278,278 +1285,18 @@
#
def next_time (from=Time.now)
@cron_line.next_time(from)
end
- end
- #
- # A 'cron line' is a line in the sense of a crontab
- # (man 5 crontab) file line.
- #
- class CronLine
+ protected
- #
- # The string used for creating this cronline instance.
- #
- attr_reader :original
-
- attr_reader \
- :seconds,
- :minutes,
- :hours,
- :days,
- :months,
- :weekdays
-
- def initialize (line)
-
- super()
-
- @original = line
-
- items = line.split
-
- unless [ 5, 6 ].include?(items.length)
- raise \
- "cron '#{line}' string should hold 5 or 6 items, " +
- "not #{items.length}" \
- end
-
- offset = items.length - 5
-
- @seconds = if offset == 1
- parse_item(items[0], 0, 59)
- else
- [ 0 ]
- end
- @minutes = parse_item(items[0+offset], 0, 59)
- @hours = parse_item(items[1+offset], 0, 24)
- @days = parse_item(items[2+offset], 1, 31)
- @months = parse_item(items[3+offset], 1, 12)
- @weekdays = parse_weekdays(items[4+offset])
-
- #adjust_arrays()
- end
-
- #
- # Returns true if the given time matches this cron line.
- #
- # (the precision is passed as well to determine if it's
- # worth checking seconds and minutes)
- #
- def matches? (time)
- #def matches? (time, precision)
-
- time = Time.at(time) unless time.kind_of?(Time)
-
- return false \
- if no_match?(time.sec, @seconds)
- #if precision <= 1 and no_match?(time.sec, @seconds)
- return false \
- if no_match?(time.min, @minutes)
- #if precision <= 60 and no_match?(time.min, @minutes)
- return false \
- if no_match?(time.hour, @hours)
- return false \
- if no_match?(time.day, @days)
- return false \
- if no_match?(time.month, @months)
- return false \
- if no_match?(time.wday, @weekdays)
-
- true
- end
-
- #
- # Returns an array of 6 arrays (seconds, minutes, hours, days,
- # months, weekdays).
- # This method is used by the cronline unit tests.
- #
- def to_array
-
- [ @seconds, @minutes, @hours, @days, @months, @weekdays ]
- end
-
- #
- # Returns the next time that this cron line is supposed to 'fire'
- #
- # This is raw, 3 secs to iterate over 1 year on my macbook :( brutal.
- #
- def next_time (now = Time.now)
-
#
- # position now to the next cron second
-
- if @seconds
- next_sec = @seconds.find { |s| s > now.sec } || 60 + @seconds.first
- now += next_sec - now.sec
- else
- now += 1
- end
-
+ # As the name implies.
#
- # prepare sec jump array
+ def do_trigger
- sjarray = nil
-
- if @seconds
-
- sjarray = []
-
- i = @seconds.index(now.sec)
- ii = i
-
- loop do
- cur = @seconds[ii]
- ii += 1
- ii = 0 if ii == @seconds.size
- nxt = @seconds[ii]
- nxt += 60 if ii == 0
- sjarray << (nxt - cur)
- break if ii == i
- end
-
- else
-
- sjarray = [ 1 ]
- end
-
- #
- # ok, seek...
-
- i = 0
-
- loop do
- return now if matches?(now)
- now += sjarray[i]
- i += 1
- i = 0 if i == sjarray.size
- # danger... potentially no exit...
- end
-
- nil
- end
-
- private
-
- #--
- # adjust values to Ruby
- #
- #def adjust_arrays()
- # @hours = @hours.collect { |h|
- # if h == 24
- # 0
- # else
- # h
- # end
- # } if @hours
- # @weekdays = @weekdays.collect { |wd|
- # wd - 1
- # } if @weekdays
- #end
- #
- # dead code, keeping it as a reminder
- #++
-
- WDS = [ "sun", "mon", "tue", "wed", "thu", "fri", "sat" ]
- #
- # used by parse_weekday()
-
- def parse_weekdays (item)
-
- item = item.downcase
-
- WDS.each_with_index do |day, index|
- item = item.gsub day, "#{index}"
- end
-
- r = parse_item item, 0, 7
-
- return r unless r.is_a?(Array)
-
- r.collect { |e| e == 7 ? 0 : e }.uniq
- end
-
- def parse_item (item, min, max)
-
- return nil \
- if item == "*"
- return parse_list(item, min, max) \
- if item.index(",")
- return parse_range(item, min, max) \
- if item.index("*") or item.index("-")
-
- i = Integer(item)
-
- i = min if i < min
- i = max if i > max
-
- [ i ]
- end
-
- def parse_list (item, min, max)
-
- items = item.split(",")
-
- items.inject([]) { |r, i| r.push(parse_range(i, min, max)) }.flatten
- end
-
- def parse_range (item, min, max)
-
- i = item.index("-")
- j = item.index("/")
-
- return item.to_i if (not i and not j)
-
- inc = 1
-
- inc = Integer(item[j+1..-1]) if j
-
- istart = -1
- iend = -1
-
- if i
-
- istart = Integer(item[0..i-1])
-
- if j
- iend = Integer(item[i+1..j])
- else
- iend = Integer(item[i+1..-1])
- end
-
- else # case */x
-
- istart = min
- iend = max
- end
-
- istart = min if istart < min
- iend = max if iend > max
-
- result = []
-
- value = istart
- loop do
-
- result << value
- value = value + inc
- break if value > iend
- end
-
- result
- end
-
- def no_match? (value, cron_values)
-
- return false if not cron_values
-
- cron_values.each do |v|
- return false if value == v # ok, it matches
- end
-
- true # no match found
+ @block.call @job_id, @cron_line, @params
end
end
end