lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1172 vs lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.16

- old
+ new

@@ -296,43 +296,43 @@ # # Schedules a job in a loop. After an execution, it will not execute # before the time specified in 'freq'. # - # Note that if your job takes 2s to execute and the freq is set to - # 10s, it will in fact execute every 12s. - # You can however wrap the code within its own thread : - # - # scheduler.schedule_every("12s") do - # Thread.new do - # do_the_job() - # end - # end - # # This method returns a job identifier which can be used to unschedule() # the job. # def schedule_every (freq, params={}, &block) f = duration_to_f freq params = prepare_params params schedulable = params[:schedulable] - params[:every] = true + params[:every] = freq - sschedule_at Time.new.to_f + f, params do |job_id, at| + last_at = params[:last_at] + next_at = if last_at + last_at + f + else + Time.now.to_f + f + end - params[:job_id] = job_id + sschedule_at next_at, params do |job_id, at| if schedulable schedulable.trigger(params) else block.call job_id, at end + + params[:job_id] = job_id + params[:last_at] = at - schedule_every(f, params, &block) \ + schedule_every(freq, params, &block) \ unless @dont_reschedule_every + # + # yes, this is a kind of recursion job_id end end @@ -403,17 +403,15 @@ cron_id = params[:cron_id] cron_id = params[:job_id] unless cron_id unschedule(cron_id) if cron_id - tags = params[:tags] - # # schedule b = to_block(params, &block) - job = CronJob.new(self, cron_id, cron_line, tags, &b) + job = CronJob.new(self, cron_id, cron_line, params, &b) @cron_jobs[job.job_id] = job job.job_id end end @@ -425,12 +423,14 @@ def get_job (job_id) job = @cron_jobs[job_id] return job if job - @pending_jobs.find do |job| - job.job_id == job_id + synchronize do + @pending_jobs.find do |job| + job.job_id == job_id + end end end # # Finds a job (via get_job()) and then returns the wrapped @@ -454,12 +454,14 @@ result = @cron_jobs.values.find_all do |job| job.has_tag?(tag) end - result + @pending_jobs.find_all do |job| - job.has_tag?(tag) + synchronize do + result + @pending_jobs.find_all do |job| + job.has_tag?(tag) + end end end # # Finds the jobs with the given tag and then returns an array of @@ -552,15 +554,15 @@ else AtJob end job_id = params[:job_id] - tags = params[:tags] b = to_block(params, &block) - job = jobClass.new(self, at, job_id, tags, &b) + job = jobClass.new(self, at, job_id, params, &b) + unschedule(job_id) if job_id if at < (Time.new.to_f + @precision) job.trigger() unless params[:discard_past] return nil @@ -668,10 +670,12 @@ @dont_reschedule_every = true if at_job_count < 1 end # TODO : eventually consider running cron / pending # job triggering in two different threads + # + # but well... there's the synchronization issue... # # cron jobs if now.sec != @last_cron_second @@ -708,17 +712,25 @@ #if job.at <= now # # obviously - trigger(job) + trigger job @pending_jobs.delete_at(0) end end end + # + # Triggers the job (in a dedicated thread). + # + # If an error occurs in the job, it well get caught and an error + # message will be displayed to STDOUT. + # If this scheduler provides a lwarn(message) method, it will + # be used insted. + # def trigger (job) Thread.new do begin job.trigger @@ -756,22 +768,48 @@ JOB_ID_LOCK = Monitor.new # # would it be better to use a Mutex instead of a full-blown # Monitor ? + # + # The parent class for scheduled jobs. + # class Job @@last_given_id = 0 # # as a scheduler is fully transient, no need to # have persistent ids, a simple counter is sufficient - attr_accessor :job_id, :tags, :block + # + # The identifier for the job + # + attr_accessor :job_id + + # + # An array of tags + # + attr_accessor :tags + + # + # The block to execute at trigger time + # + attr_accessor :block + + # + # A reference to the scheduler + # attr_reader :scheduler + + # + # Keeping a copy of the initialization params of the job. + # + attr_reader :params - def initialize (scheduler, job_id, tags, &block) + def initialize (scheduler, job_id, params, &block) + @scheduler = scheduler @block = block if job_id @job_id = job_id @@ -780,83 +818,156 @@ @job_id = @@last_given_id @@last_given_id = @job_id + 1 end end + @params = params + #@tags = Array(tags).collect { |tag| tag.to_s } # making sure we have an array of String tags - @tags = Array(tags) + @tags = Array(params[:tags]) # any tag is OK end # # Returns true if this job sports the given tag # def has_tag? (tag) + @tags.include?(tag) end # # Removes (cancels) this job from its scheduler. # def unschedule + @scheduler.unschedule(@job_id) end end + # + # An 'at' job. class AtJob < Job + # + # The float representation (Time.to_f) of the time at which + # the job should be triggered. + # attr_accessor :at - def initialize (scheduler, at, at_id, tags, &block) - super(scheduler, at_id, tags, &block) + # + # The constructor. + # + def initialize (scheduler, at, at_id, params, &block) + + super(scheduler, at_id, params, &block) @at = at end + # + # Triggers the job (calls the block) + # def trigger + @block.call @job_id, @at end + + # + # Returns the Time instance at which this job is scheduled. + # + def schedule_info + + Time.at(@at) + end end + # + # An 'every' job is simply an extension of an 'at' job. + # class EveryJob < AtJob + + # + # Returns the frequency string used to schedule this EveryJob, + # like for example "3d" or "1M10d3h". + # + def schedule_info + + @params[:every] + end end + # + # A cron job. + # class CronJob < Job + # + # The CronLine instance representing the times at which + # the cron job has to be triggered. + # attr_accessor :cron_line - def initialize (scheduler, cron_id, line, tags, &block) + def initialize (scheduler, cron_id, line, params, &block) - super(scheduler, cron_id, tags, &block) + super(scheduler, cron_id, params, &block) - if line.kind_of?(String) + if line.is_a?(String) + @cron_line = CronLine.new(line) - elsif line.kind_of?(CronLine) + + elsif line.is_a?(CronLine) + @cron_line = line + else + raise \ "Cannot initialize a CronJob " + "with a param of class #{line.class}" end end + # + # This is the method called by the scheduler to determine if it + # has to fire this CronJob instance. + # def matches? (time) + @cron_line.matches? time end + # + # As the name implies. + # def trigger + @block.call @job_id, @cron_line end + + # + # Returns the original cron tab string used to schedule this + # Job. Like for example "60/3 * * * Sun". + # + def schedule_info + + @cron_line.original + end end # # A 'cron line' is a line in the sense of a crontab # (man 5 crontab) file line. # class CronLine + # + # The string used for creating this cronline instance. + # + attr_reader :original + attr_reader \ :seconds, :minutes, :hours, :days, @@ -865,10 +976,12 @@ def initialize (line) super() + @original = line + items = line.split unless [ 5, 6 ].include?(items.length) raise \ "cron '#{line}' string should hold 5 or 6 items, " + @@ -889,10 +1002,13 @@ @weekdays = parse_weekdays(items[4+offset]) #adjust_arrays() end + # + # Returns true if the given time matches this cron line. + # def matches? (time) if time.kind_of?(Float) or time.kind_of?(Integer) time = Time.at(time) end @@ -916,11 +1032,11 @@ [ @seconds, @minutes, @hours, @days, @months, @weekdays ] end private - # + #-- # adjust values to Ruby # #def adjust_arrays() # @hours = @hours.collect { |h| # if h == 24 @@ -932,10 +1048,11 @@ # @weekdays = @weekdays.collect { |wd| # wd - 1 # } if @weekdays #end # - # dead code + # dead code, keeping as a reminder + #++ WDS = [ "mon", "tue", "wed", "thu", "fri", "sat", "sun" ] # # used by parse_weekday()