lib/rufus/scheduler.rb in rufus-scheduler-1.0.11 vs lib/rufus/scheduler.rb in rufus-scheduler-1.0.12
- old
+ new
@@ -27,11 +27,10 @@
#
# John Mettraux at openwfe.org
#
require 'thread'
-require 'monitor'
require 'rufus/otime'
require 'rufus/cronline'
module Rufus
@@ -80,10 +79,13 @@
#
# job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
# init_self_destruction_sequence()
# end
#
+ # scheduler.join # join the scheduler (prevents exiting)
+ #
+ #
# an example that uses a Schedulable class :
#
# class Regenerator < Schedulable
# def trigger (frequency)
# self.send(frequency)
@@ -216,18 +218,31 @@
# puts "oops, something wrong happened : "
# puts block.call
# end
# end
#
+ # # or
+ #
+ # def scheduler.lwarn (&block)
+ # puts "oops, something wrong happened : "
+ # puts block.call
+ # end
+ #
# # 2 - overriding the [protected] method log_exception(e) :
#
# class << scheduler
# def log_exception (e)
# puts "something wrong happened : "+e.to_s
# end
# end
#
+ # # or
+ #
+ # def scheduler.log_exception (e)
+ # puts "something wrong happened : "+e.to_s
+ # end
+ #
# == 'Every jobs' and rescheduling
#
# Every jobs can reschedule/unschedule themselves. A reschedule example :
#
# schedule.schedule_every "5h" do |job_id, at, params|
@@ -302,13 +317,29 @@
# This new method will return nil if the job is not currently being
# triggered. Not that in case of an every or cron job, this method
# will return the thread of the last triggered instance, thus, in case
# of overlapping executions, you only get the most recent thread.
#
+ #
+ # == specifying a :timeout for a job
+ #
+ # rufus-scheduler 1.0.12 introduces a :timeout parameter for jobs.
+ #
+ # scheduler.every "3h", :timeout => '2h30m' do
+ # do_that_long_job()
+ # end
+ #
+ # after 2 hours and half, the 'long job' will get interrupted by a
+ # Rufus::TimeOutError (so that you know what to catch).
+ #
+ # :timeout is applicable to all types of jobs : at, in, every, cron. It
+ # accepts a String value following the "Mdhms" scheme the rufus-scheduler
+ # uses.
+ #
class Scheduler
- VERSION = '1.0.11'
+ VERSION = '1.0.12'
#
# By default, the precision is 0.250, with means the scheduler
# will check for jobs to execute 4 times per second.
#
@@ -317,11 +348,11 @@
#
# Setting the precision ( 0.0 < p <= 1.0 )
#
def precision= (f)
- raise "precision must be 0.0 < p <= 1.0" \
+ raise 'precision must be 0.0 < p <= 1.0' \
if f <= 0.0 or f > 1.0
@precision = f
end
@@ -399,11 +430,11 @@
d = Time.now.to_f - t0 # + @correction
next if d > @precision
- sleep (@precision - d)
+ sleep(@precision - d)
end
end
end
#
@@ -550,16 +581,18 @@
# (without setting a :first_in (or :first_at), our example schedule would
# have had been triggered after two days).
#
def schedule_every (freq, params={}, &block)
- params = prepare_params params
+ params = prepare_params(params)
params[:every] = freq
first_at = params[:first_at]
first_in = params[:first_in]
+ #params[:delayed] = true if first_at or first_in
+
first_at = if first_at
at_to_f(first_at)
elsif first_in
Time.now.to_f + Rufus.duration_to_f(first_in)
else
@@ -677,17 +710,26 @@
end
#
# Returns an array of jobs that have the given tag.
#
- def find_jobs (tag)
+ def find_jobs (tag=nil)
- @cron_jobs.values.find_all { |job| job.has_tag?(tag) } +
- @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) }
+ jobs = @cron_jobs.values + @non_cron_jobs.values
+ jobs = jobs.select { |job| job.has_tag?(tag) } if tag
+ jobs
end
#
+ # Returns all the jobs in the scheduler.
+ #
+ def all_jobs
+
+ find_jobs()
+ end
+
+ #
# Finds the jobs with the given tag and then returns an array of
# the wrapped Schedulable objects.
# Jobs that haven't a wrapped Schedulable won't be included in the
# result.
#
@@ -777,19 +819,19 @@
# The core method behind schedule_at and schedule_in (and also
# schedule_every). It's protected, don't use it directly.
#
def do_schedule_at (at, params={}, &block)
- job = params.delete :job
+ job = params.delete(:job)
unless job
jobClass = params[:every] ? EveryJob : AtJob
- b = to_block params, &block
+ b = to_block(params, &block)
- job = jobClass.new self, at_to_f(at), params[:job_id], params, &b
+ job = jobClass.new(self, at_to_f(at), params[:job_id], params, &b)
end
if jobClass == AtJob && job.at < (Time.new.to_f + @precision)
job.trigger() unless params[:discard_past]
@@ -1018,13 +1060,19 @@
def reschedule (scheduler)
raise "reschedule() implentation is missing"
end
end
+ #
+ # This error is thrown when the :timeout attribute triggers
+ #
+ class TimeOutError < RuntimeError
+ end
+
protected
- JOB_ID_LOCK = Monitor.new
+ JOB_ID_LOCK = Mutex.new
#
# would it be better to use a Mutex instead of a full-blown
# Monitor ?
#
@@ -1129,10 +1177,22 @@
#@trigger_thread = nil if @trigger_thread = Thread.current
@trigger_thread = nil
# overlapping executions, what to do ?
end
+
+ if trigger_thread_alive? and (to = @params[:timeout])
+ @scheduler.in(to, :tags => 'timeout') do
+ @trigger_thread.raise(Rufus::TimeOutError) if trigger_thread_alive?
+ end
+ end
end
+
+ protected
+
+ def trigger_thread_alive?
+ (@trigger_thread && @trigger_thread.alive?)
+ end
end
#
# An 'at' job.
#