lib/rufus/scheduler.rb in rufus-scheduler-1.0.7 vs lib/rufus/scheduler.rb in rufus-scheduler-1.0.8

- old
+ new

@@ -29,12 +29,12 @@ # require 'thread' require 'monitor' require 'rufus/otime' +require 'rufus/cronline' - module Rufus # # The Scheduler is used by OpenWFEru for registering 'at' and 'cron' jobs. # 'at' jobs to execute once at a given point in time. 'cron' jobs @@ -43,11 +43,11 @@ # # schedule_at() and schedule() await either a Schedulable instance and # params (usually an array or nil), either a block, which is more in the # Ruby way. # - # == The gem "openwferu-scheduler" + # == The gem "rufus-scheduler" # # This scheduler was previously known as the "openwferu-scheduler" gem. # # To ensure that code tapping the previous gem still runs fine with # "rufus-scheduler", this new gem has 'pointers' for the old class @@ -112,10 +112,13 @@ # # scheduler.schedule_every("1h20m") do # regenerate_latest_report() # end # + # (note : a schedule every isn't triggered immediately, thus this example + # will first trigger 1 hour and 20 minutes after being scheduled) + # # The scheduler has a "exit_when_no_more_jobs" attribute. When set to # 'true', the scheduler will exit as soon as there are no more jobs to # run. # Use with care though, if you create a scheduler, set this attribute # to true and start the scheduler, the scheduler will immediately exit. @@ -285,10 +288,24 @@ # You can specify the name of the scheduler's thread. Should make # it easier in some debugging situations. # # scheduler.new :thread_name => "the crazy scheduler" # + # + # == job.trigger_thread + # + # Since rufus-scheduler 1.0.8, you can have access to the thread of + # a job currently being triggered. + # + # job = scheduler.get_job(job_id) + # thread = job.trigger_thread + # + # This new method will return nil if the job is not currently being + # triggered. Not that in case of an every or cron job, this method + # will return the thread of the last triggered instance, thus, in case + # of overlapping executions, you only get the most recent thread. + # class Scheduler # # By default, the precision is 0.250, with means the scheduler # will check for jobs to execute 4 times per second. @@ -322,10 +339,11 @@ super() @pending_jobs = [] @cron_jobs = {} + @non_cron_jobs = {} @schedule_queue = Queue.new @unschedule_queue = Queue.new # # sync between the step() method and the [un]schedule @@ -344,11 +362,11 @@ @thread_name = params[:thread_name] || "rufus scheduler" #@correction = 0.00045 @exit_when_no_more_jobs = false - @dont_reschedule_every = false + #@dont_reschedule_every = false @last_cron_second = -1 @stopped = true end @@ -465,27 +483,37 @@ at, prepare_params(params), &block) end + # + # a shortcut for schedule_at + # + alias :at :schedule_at + # # Schedules a job by stating in how much time it should trigger. # Returns the a job_id that can be used to unschedule the job. # # This method returns a job identifier which can be used to unschedule() # the job. # def schedule_in (duration, params={}, &block) do_schedule_at( - Time.new.to_f + duration_to_f(duration), + Time.new.to_f + Rufus::duration_to_f(duration), prepare_params(params), &block) end # + # a shortcut for schedule_in + # + alias :in :schedule_in + + # # Schedules a job in a loop. After an execution, it will not execute # before the time specified in 'freq'. # # This method returns a job identifier which can be used to unschedule() # the job. @@ -503,81 +531,38 @@ # # scheduler.schedule_every "2d", :first_in => "5h" do # # schedule something every two days, start in 5 hours... # end # + # (without setting a :first_in (or :first_at), our example schedule would + # have had been triggered after two days). + # def schedule_every (freq, params={}, &block) - f = duration_to_f freq - params = prepare_params params - schedulable = params[:schedulable] params[:every] = freq - first_at = params.delete :first_at - first_in = params.delete :first_in + first_at = params[:first_at] + first_in = params[:first_in] - previous_at = params[:previous_at] - - next_at = if first_at - first_at + first_at = if first_at + at_to_f(first_at) elsif first_in - Time.now.to_f + duration_to_f(first_in) - elsif previous_at - previous_at + f + Time.now.to_f + Rufus.duration_to_f(first_in) else - Time.now.to_f + f + Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately end - do_schedule_at(next_at, params) do |job_id, at| - - # - # trigger ... - - hit_exception = false - - begin - - if schedulable - schedulable.trigger params - else - block.call job_id, at, params - end - - rescue Exception => e - - log_exception e - - hit_exception = true - end - - # cannot use a return here !!! (block) - - unless \ - @dont_reschedule_every or - (params[:dont_reschedule] == true) or - (hit_exception and params[:try_again] == false) - - # - # ok, reschedule ... - - params[:job_id] = job_id - params[:previous_at] = at - - schedule_every params[:every], params, &block - # - # yes, this is a kind of recursion - - # note that params[:every] might have been changed - # by the block/schedulable code - end - - job_id - end + do_schedule_at(first_at, params, &block) end # + # a shortcut for schedule_every + # + alias :every :schedule_every + + # # Schedules a cron job, the 'cron_line' is a string # following the Unix cron standard (see "man 5 crontab" in your command # line, or http://www.google.com/search?q=man%205%20crontab). # # For example : @@ -605,28 +590,30 @@ params = prepare_params(params) # # is a job with the same id already scheduled ? - cron_id = params[:cron_id] - cron_id = params[:job_id] unless cron_id + cron_id = params[:cron_id] || params[:job_id] - #unschedule(cron_id) if cron_id - @unschedule_queue << [ :cron, cron_id ] + #@unschedule_queue << cron_id # # schedule b = to_block(params, &block) job = CronJob.new(self, cron_id, cron_line, params, &b) - #@cron_jobs[job.job_id] = job @schedule_queue << job job.job_id end + # + # an alias for schedule() + # + alias :cron :schedule + #-- # # The UNscheduling methods # #++ @@ -635,19 +622,21 @@ # Unschedules an 'at' or a 'cron' job identified by the id # it was given at schedule time. # def unschedule (job_id) - @unschedule_queue << [ :at, job_id ] + @unschedule_queue << job_id end # # Unschedules a cron job # + # (deprecated : use unschedule(job_id) for all the jobs !) + # def unschedule_cron_job (job_id) - @unschedule_queue << [ :cron, job_id ] + unschedule(job_id) end #-- # # 'query' methods @@ -658,35 +647,30 @@ # Returns the job corresponding to job_id, an instance of AtJob # or CronJob will be returned. # def get_job (job_id) - @cron_jobs[job_id] || @pending_jobs.find { |job| job.job_id == job_id } + @cron_jobs[job_id] || @non_cron_jobs[job_id] end # # Finds a job (via get_job()) and then returns the wrapped # schedulable if any. # def get_schedulable (job_id) - #return nil unless job_id - j = get_job(job_id) - - return j.schedulable if j.respond_to?(:schedulable) - - nil + j.respond_to?(:schedulable) ? j.schedulable : nil end # # Returns an array of jobs that have the given tag. # def find_jobs (tag) @cron_jobs.values.find_all { |job| job.has_tag?(tag) } + - @pending_jobs.find_all { |job| job.has_tag?(tag) } + @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) } end # # Finds the jobs with the given tag and then returns an array of # the wrapped Schedulable objects. @@ -718,116 +702,110 @@ # # Returns the current count of 'every' jobs scheduled. # def every_job_count - @pending_jobs.select { |j| j.is_a?(EveryJob) }.size + @non_cron_jobs.values.select { |j| j.class == EveryJob }.size end # # Returns the current count of 'at' jobs scheduled (not 'every'). # def at_job_count - @pending_jobs.select { |j| j.instance_of?(AtJob) }.size + @non_cron_jobs.values.select { |j| j.class == AtJob }.size end # # Returns true if the given string seems to be a cron string. # - def Scheduler.is_cron_string (s) + def self.is_cron_string (s) s.match ".+ .+ .+ .+ .+" # well... end private + # + # the unschedule work itself. + # def do_unschedule (job_id) + job = get_job job_id + + return (@cron_jobs.delete(job_id) != nil) if job.is_a?(CronJob) + + return false unless job # not found + + if job.is_a?(AtJob) # catches AtJob and EveryJob instances + @non_cron_jobs.delete(job_id) + job.params[:dont_reschedule] = true # for AtJob as well, no worries + end + for i in 0...@pending_jobs.length if @pending_jobs[i].job_id == job_id @pending_jobs.delete_at i - return true + return true # asap end end - # - # not using delete_if because it scans the whole list - do_unschedule_cron_job job_id + true end - def do_unschedule_cron_job (job_id) - - (@cron_jobs.delete(job_id) != nil) - end - # # Making sure that params is a Hash. # def prepare_params (params) - params = { :schedulable => params } \ - if params.is_a?(Schedulable) - params + params.is_a?(Schedulable) ? { :schedulable => params } : params end # # The core method behind schedule_at and schedule_in (and also # schedule_every). It's protected, don't use it directly. # def do_schedule_at (at, params={}, &block) - #puts "0 at is '#{at.to_s}' (#{at.class})" + job = params.delete :job - at = at_to_f at + unless job - #puts "1 at is '#{at.to_s}' (#{at.class})"}" + jobClass = params[:every] ? EveryJob : AtJob - jobClass = params[:every] ? EveryJob : AtJob + b = to_block params, &block - job_id = params[:job_id] + job = jobClass.new self, at_to_f(at), params[:job_id], params, &b + end - b = to_block params, &block + if jobClass == AtJob && job.at < (Time.new.to_f + @precision) - job = jobClass.new self, at, job_id, params, &b + job.trigger() unless params[:discard_past] - #do_unschedule(job_id) if job_id + @non_cron_jobs.delete job.job_id # just to be sure - if at < (Time.new.to_f + @precision) - - job.trigger() unless params[:discard_past] return nil end + @non_cron_jobs[job.job_id] = job + @schedule_queue << job job.job_id end # - # Ensures that a duration is a expressed as a Float instance. - # - # duration_to_f("10s") - # - # will yields 10.0 - # - def duration_to_f (s) - - return s if s.kind_of?(Float) - return Rufus::parse_time_string(s) if s.kind_of?(String) - Float(s.to_s) - end - - # # Ensures an 'at' instance is translated to a float # (to be compared with the float coming from time.to_f) # def at_to_f (at) at = Rufus::to_ruby_time(at) if at.kind_of?(String) at = Rufus::to_gm_time(at) if at.kind_of?(DateTime) at = at.to_f if at.kind_of?(Time) + + raise "cannot schedule at : #{at.inspect}" unless at.is_a?(Float) + at end # # Returns a block. If a block is passed, will return it, else, @@ -836,16 +814,14 @@ # def to_block (params, &block) return block if block - schedulable = params[:schedulable] + schedulable = params.delete(:schedulable) return nil unless schedulable - params.delete :schedulable - l = lambda do schedulable.trigger(params) end class << l attr_accessor :schedulable @@ -884,13 +860,10 @@ # determine if there are jobs to trigger else to get back to sleep. # 'cron' jobs get executed if necessary then 'at' jobs. # def step - #puts Time.now.to_f - #puts @pending_jobs.collect { |j| [ j.job_id, j.at ] }.inspect - step_unschedule # unschedules any job in the unschedule queue before # they have a chance to get triggered. step_trigger @@ -909,19 +882,11 @@ loop do break if @unschedule_queue.empty? - type, job_id = @unschedule_queue.pop - - if type == :cron - - do_unschedule_cron_job job_id - else - - do_unschedule job_id - end + do_unschedule(@unschedule_queue.pop) end end # # adds every job waiting in the @schedule_queue to @@ -945,26 +910,21 @@ end end end # - # triggers every eligible pending jobs, then every eligible + # triggers every eligible pending (at or every) jobs, then every eligible # cron jobs. # def step_trigger - now = Time.new + now = Time.now - if @exit_when_no_more_jobs + if @exit_when_no_more_jobs && @pending_jobs.size < 1 - if @pending_jobs.size < 1 - - @stopped = true - return - end - - @dont_reschedule_every = true if at_job_count < 1 + @stopped = true + return end # TODO : eventually consider running cron / pending # job triggering in two different threads # @@ -975,16 +935,13 @@ if now.sec != @last_cron_second @last_cron_second = now.sec - #puts "step() @cron_jobs.size #{@cron_jobs.size}" - @cron_jobs.each do |cron_id, cron_job| - #puts "step() cron_id : #{cron_id}" #trigger(cron_job) if cron_job.matches?(now, @precision) - trigger(cron_job) if cron_job.matches?(now) + cron_job.trigger if cron_job.matches?(now) end end # # pending jobs @@ -1003,34 +960,17 @@ #if job.at <= now # # obviously - trigger job + job.trigger @pending_jobs.delete_at 0 end end # - # Triggers the job (in a dedicated thread). - # - def trigger (job) - - Thread.new do - begin - - job.trigger - - rescue Exception => e - - log_exception e - end - end - end - - # # If an error occurs in the job, it well get caught and an error # message will be displayed to STDOUT. # If this scheduler provides a lwarn(message) method, it will # be used insted. # @@ -1106,11 +1046,17 @@ # # Keeping a copy of the initialization params of the job. # attr_reader :params + # + # if the job is currently executing, this field points to + # the 'trigger thread' + # + attr_reader :trigger_thread + def initialize (scheduler, job_id, params, &block) @scheduler = scheduler @block = block @@ -1145,10 +1091,35 @@ # def unschedule @scheduler.unschedule(@job_id) end + + # + # Triggers the job (in a dedicated thread). + # + def trigger + + Thread.new do + + @trigger_thread = Thread.current + # keeping track of the thread + + begin + + do_trigger + + rescue Exception => e + + @scheduler.send(:log_exception, e) + end + + #@trigger_thread = nil if @trigger_thread = Thread.current + @trigger_thread = nil + # overlapping executions, what to do ? + end + end end # # An 'at' job. # @@ -1158,28 +1129,18 @@ # The float representation (Time.to_f) of the time at which # the job should be triggered. # attr_accessor :at - # - # The constructor. - # + def initialize (scheduler, at, at_id, params, &block) super(scheduler, at_id, params, &block) @at = at end # - # Triggers the job (calls the block) - # - def trigger - - @block.call @job_id, @at - end - - # # Returns the Time instance at which this job is scheduled. # def schedule_info Time.at(@at) @@ -1191,10 +1152,22 @@ # def next_time schedule_info end + + protected + + # + # Triggers the job (calls the block) + # + def do_trigger + + @block.call @job_id, @at + + @scheduler.instance_variable_get(:@non_cron_jobs).delete @job_id + end end # # An 'every' job is simply an extension of an 'at' job. # @@ -1206,10 +1179,52 @@ # def schedule_info @params[:every] end + + protected + + # + # triggers the job, then reschedules it if necessary + # + def do_trigger + + hit_exception = false + + begin + + @block.call @job_id, @at, @params + + rescue Exception => e + + @scheduler.send(:log_exception, e) + + hit_exception = true + end + + if \ + @scheduler.instance_variable_get(:@exit_when_no_more_jobs) or + (@params[:dont_reschedule] == true) or + (hit_exception and @params[:try_again] == false) + + @scheduler.instance_variable_get(:@non_cron_jobs).delete(job_id) + # maybe it'd be better to wipe that reference from here anyway... + + return + end + + # + # ok, reschedule ... + + + params[:job] = self + + @at = @at + Rufus.duration_to_f(params[:every]) + + @scheduler.send(:do_schedule_at, @at, params) + end end # # A cron job. # @@ -1251,18 +1266,10 @@ #@cron_line.matches?(time, precision) @cron_line.matches?(time) end # - # As the name implies. - # - def trigger - - @block.call @job_id, @cron_line, @params - end - - # # Returns the original cron tab string used to schedule this # Job. Like for example "60/3 * * * Sun". # def schedule_info @@ -1278,278 +1285,18 @@ # def next_time (from=Time.now) @cron_line.next_time(from) end - end - # - # A 'cron line' is a line in the sense of a crontab - # (man 5 crontab) file line. - # - class CronLine + protected - # - # The string used for creating this cronline instance. - # - attr_reader :original - - attr_reader \ - :seconds, - :minutes, - :hours, - :days, - :months, - :weekdays - - def initialize (line) - - super() - - @original = line - - items = line.split - - unless [ 5, 6 ].include?(items.length) - raise \ - "cron '#{line}' string should hold 5 or 6 items, " + - "not #{items.length}" \ - end - - offset = items.length - 5 - - @seconds = if offset == 1 - parse_item(items[0], 0, 59) - else - [ 0 ] - end - @minutes = parse_item(items[0+offset], 0, 59) - @hours = parse_item(items[1+offset], 0, 24) - @days = parse_item(items[2+offset], 1, 31) - @months = parse_item(items[3+offset], 1, 12) - @weekdays = parse_weekdays(items[4+offset]) - - #adjust_arrays() - end - - # - # Returns true if the given time matches this cron line. - # - # (the precision is passed as well to determine if it's - # worth checking seconds and minutes) - # - def matches? (time) - #def matches? (time, precision) - - time = Time.at(time) unless time.kind_of?(Time) - - return false \ - if no_match?(time.sec, @seconds) - #if precision <= 1 and no_match?(time.sec, @seconds) - return false \ - if no_match?(time.min, @minutes) - #if precision <= 60 and no_match?(time.min, @minutes) - return false \ - if no_match?(time.hour, @hours) - return false \ - if no_match?(time.day, @days) - return false \ - if no_match?(time.month, @months) - return false \ - if no_match?(time.wday, @weekdays) - - true - end - - # - # Returns an array of 6 arrays (seconds, minutes, hours, days, - # months, weekdays). - # This method is used by the cronline unit tests. - # - def to_array - - [ @seconds, @minutes, @hours, @days, @months, @weekdays ] - end - - # - # Returns the next time that this cron line is supposed to 'fire' - # - # This is raw, 3 secs to iterate over 1 year on my macbook :( brutal. - # - def next_time (now = Time.now) - # - # position now to the next cron second - - if @seconds - next_sec = @seconds.find { |s| s > now.sec } || 60 + @seconds.first - now += next_sec - now.sec - else - now += 1 - end - + # As the name implies. # - # prepare sec jump array + def do_trigger - sjarray = nil - - if @seconds - - sjarray = [] - - i = @seconds.index(now.sec) - ii = i - - loop do - cur = @seconds[ii] - ii += 1 - ii = 0 if ii == @seconds.size - nxt = @seconds[ii] - nxt += 60 if ii == 0 - sjarray << (nxt - cur) - break if ii == i - end - - else - - sjarray = [ 1 ] - end - - # - # ok, seek... - - i = 0 - - loop do - return now if matches?(now) - now += sjarray[i] - i += 1 - i = 0 if i == sjarray.size - # danger... potentially no exit... - end - - nil - end - - private - - #-- - # adjust values to Ruby - # - #def adjust_arrays() - # @hours = @hours.collect { |h| - # if h == 24 - # 0 - # else - # h - # end - # } if @hours - # @weekdays = @weekdays.collect { |wd| - # wd - 1 - # } if @weekdays - #end - # - # dead code, keeping it as a reminder - #++ - - WDS = [ "sun", "mon", "tue", "wed", "thu", "fri", "sat" ] - # - # used by parse_weekday() - - def parse_weekdays (item) - - item = item.downcase - - WDS.each_with_index do |day, index| - item = item.gsub day, "#{index}" - end - - r = parse_item item, 0, 7 - - return r unless r.is_a?(Array) - - r.collect { |e| e == 7 ? 0 : e }.uniq - end - - def parse_item (item, min, max) - - return nil \ - if item == "*" - return parse_list(item, min, max) \ - if item.index(",") - return parse_range(item, min, max) \ - if item.index("*") or item.index("-") - - i = Integer(item) - - i = min if i < min - i = max if i > max - - [ i ] - end - - def parse_list (item, min, max) - - items = item.split(",") - - items.inject([]) { |r, i| r.push(parse_range(i, min, max)) }.flatten - end - - def parse_range (item, min, max) - - i = item.index("-") - j = item.index("/") - - return item.to_i if (not i and not j) - - inc = 1 - - inc = Integer(item[j+1..-1]) if j - - istart = -1 - iend = -1 - - if i - - istart = Integer(item[0..i-1]) - - if j - iend = Integer(item[i+1..j]) - else - iend = Integer(item[i+1..-1]) - end - - else # case */x - - istart = min - iend = max - end - - istart = min if istart < min - iend = max if iend > max - - result = [] - - value = istart - loop do - - result << value - value = value + inc - break if value > iend - end - - result - end - - def no_match? (value, cron_values) - - return false if not cron_values - - cron_values.each do |v| - return false if value == v # ok, it matches - end - - true # no match found + @block.call @job_id, @cron_line, @params end end end