lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1110 vs lib/openwfe/util/scheduler.rb in openwferu-scheduler-0.9.15.1127
- old
+ new
@@ -1,6 +1,6 @@
-#
+
#--
# Copyright (c) 2006-2007, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -100,19 +100,41 @@
# Use with care though, if you create a scheduler, set this attribute
# to true and start the scheduler, the scheduler will immediately exit.
# This attribute is best used indirectly : the method
# join_until_no_more_jobs() wraps it.
#
- # Since OpenWFEru 0.9.13, the :scheduler_precision can be set when
- # instantiating the scheduler.
+ # The :scheduler_precision can be set when instantiating the scheduler.
#
# scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
# scheduler.start
# #
# # instatiates a scheduler that checks its jobs twice per second
# # (the default is 4 times per second (0.250))
#
+ #
+ # Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :
+ #
+ # scheduler.schedule_in "2h", :tags => "backup" do
+ # init_backup_sequence()
+ # end
+ #
+ # scheduler.schedule "0 24 * * *", :tags => "new_day" do
+ # do_this_or_that()
+ # end
+ #
+ # jobs = find_jobs 'backup'
+ #
+ # Multiple tags may be attached to a single job :
+ #
+ # scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
+ # init_backup_sequence()
+ # end
+ #
+ # The vanilla case for tags assume they are String instances, but nothing
+ # prevents you from using anything else. The scheduler has no persistence
+ # by itself, so no serialization issue.
+ #
class Scheduler
include MonitorMixin
#
# By default, the precision is 0.250, with means the scheduler
@@ -129,11 +151,11 @@
def initialize (params={})
super()
@pending_jobs = []
- @cron_entries = {}
+ @cron_jobs = {}
@scheduler_thread = nil
@precision = 0.250
# every 250ms, the scheduler wakes up (default value)
@@ -159,12 +181,15 @@
@stopped = false
@scheduler_thread = Thread.new do
if defined?(JRUBY_VERSION)
+
require 'java'
- java.lang.Thread.current_thread.name = "openwferu scheduler (Ruby Thread)"
+
+ java.lang.Thread.current_thread.name = \
+ "openwferu scheduler (Ruby Thread)"
end
while true
break if @stopped
step
@@ -296,11 +321,11 @@
#
def unschedule (job_id)
synchronize do
for i in 0...@pending_jobs.length
- if @pending_jobs[i].eid == job_id
+ if @pending_jobs[i].job_id == job_id
@pending_jobs.delete_at i
return true
end
end
@@ -311,12 +336,12 @@
#
# Unschedules a cron job
#
def unschedule_cron_job (job_id)
synchronize do
- if @cron_entries.has_key?(job_id)
- @cron_entries.delete job_id
+ if @cron_jobs.has_key?(job_id)
+ @cron_jobs.delete job_id
return true
end
false
end
end
@@ -357,32 +382,34 @@
cron_id = params[:cron_id]
cron_id = params[:job_id] unless cron_id
unschedule(cron_id) if cron_id
+ tags = params[:tags]
+
#
# schedule
b = to_block(params, &block)
- entry = CronEntry.new(cron_id, cron_line, &b)
- @cron_entries[entry.eid] = entry
+ job = CronJob.new(cron_id, cron_line, tags, &b)
+ @cron_jobs[job.job_id] = job
- entry.eid
+ job.job_id
end
end
#
- # Returns the job corresponding to job_id, an instance of AtEntry
- # or CronEntry will be returned.
+ # Returns the job corresponding to job_id, an instance of AtJob
+ # or CronJob will be returned.
#
def get_job (job_id)
- entry = @cron_entries[job_id]
- return entry if entry
+ job = @cron_jobs[job_id]
+ return job if job
- @pending_jobs.find do |entry|
- entry.eid == job_id
+ @pending_jobs.find do |job|
+ job.job_id == job_id
end
end
#
# Finds a job (via get_job()) and then returns the wrapped
@@ -392,16 +419,49 @@
return nil unless job_id
j = get_job(job_id)
- return j.schedulable if j.respond_to? :schedulable
+ return j.schedulable if j.respond_to?(:schedulable)
nil
end
#
+ # Returns an array of jobs that have the given tag.
+ #
+ def find_jobs (tag)
+
+ result = @cron_jobs.values.find_all do |job|
+ job.has_tag?(tag)
+ end
+
+ result + @pending_jobs.find_all do |job|
+ job.has_tag?(tag)
+ end
+ end
+
+ #
+ # Finds the jobs with the given tag and then returns an array of
+ # the wrapped Schedulable objects.
+ # Jobs that haven't a wrapped Schedulable won't be included in the
+ # result.
+ #
+ def find_schedulables (tag)
+
+ jobs = find_jobs(tag)
+
+ result = []
+
+ jobs.each do |job|
+ result.push(job.schedulable) if job.respond_to?(:schedulable)
+ end
+
+ result
+ end
+
+ #
# Returns the number of currently pending jobs in this scheduler
# ('at' jobs and 'every' jobs).
#
def pending_job_count
@pending_jobs.size
@@ -409,25 +469,25 @@
#
# Returns the number of cron jobs currently active in this scheduler.
#
def cron_job_count
- @cron_entries.size
+ @cron_jobs.size
end
#
# Returns the current count of 'every' jobs scheduled.
#
def every_job_count
- @pending_jobs.select { |j| j.is_a?(EveryEntry) }.size
+ @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
end
#
# Returns the current count of 'at' jobs scheduled (not 'every').
#
def at_job_count
- @pending_jobs.select { |j| j.instance_of?(AtEntry) }.size
+ @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
end
#
# Returns true if the given string seems to be a cron string.
#
@@ -465,19 +525,20 @@
if at.kind_of?(Time)
#puts "1 at is '#{at.to_s}' (#{at.class})"}"
jobClass = if params[:every]
- EveryEntry
+ EveryJob
else
- AtEntry
+ AtJob
end
job_id = params[:job_id]
+ tags = params[:tags]
b = to_block(params, &block)
- job = jobClass.new(at, job_id, &b)
+ job = jobClass.new(at, job_id, tags, &b)
unschedule(job_id) if job_id
if at < (Time.new.to_f + @precision)
job.trigger() unless params[:discard_past]
@@ -559,11 +620,11 @@
@pending_jobs[index, 0] = job
end
#puts "push() at '#{Time.at(job.at)}'"
- job.eid
+ job.job_id
end
#
# This is the method called each time the scheduler wakes up
# (by default 4 times per second). It's meant to quickly
@@ -585,24 +646,24 @@
@dont_reschedule_every = true if at_job_count < 1
end
#
- # cron entries
+ # cron jobs
if now.sec == 0 and now.min != @last_cron_minute
#
- # only consider cron entries at the second 0 of a
+ # only consider cron jobs at the second 0 of a
# minute
@last_cron_minute = now.min
- #puts "step() @cron_entries.size #{@cron_entries.size}"
+ #puts "step() @cron_jobs.size #{@cron_jobs.size}"
- @cron_entries.each do |cron_id, cron_entry|
+ @cron_jobs.each do |cron_id, cron_job|
#puts "step() cron_id : #{cron_id}"
- trigger(cron_entry) if cron_entry.matches?(now)
+ trigger(cron_job) if cron_job.matches?(now)
end
end
#
# pending jobs
@@ -633,15 +694,15 @@
@pending_jobs.delete_at(0)
end
end
end
- def trigger (entry)
+ def trigger (job)
Thread.new do
begin
- entry.trigger
+ job.trigger
rescue Exception => e
message =
"trigger() caught exception\n" +
OpenWFE::exception_to_s(e)
if self.respond_to? :lwarn
@@ -670,82 +731,94 @@
end
protected
JOB_ID_LOCK = Monitor.new
+ #
+ # would it be better to use a Mutex instead of a full-blown
+ # Monitor ?
- class Entry
+ class Job
@@last_given_id = 0
#
# as a scheduler is fully transient, no need to
# have persistent ids, a simple counter is sufficient
attr_accessor \
- :eid, :block
+ :job_id, :tags, :block
+
+ def initialize (job_id, tags, &block)
- def initialize (entry_id=nil, &block)
@block = block
- if entry_id
- @eid = entry_id
+
+ if job_id
+ @job_id = job_id
else
JOB_ID_LOCK.synchronize do
- @eid = @@last_given_id
- @@last_given_id = @eid + 1
+ @job_id = @@last_given_id
+ @@last_given_id = @job_id + 1
end
end
+
+ #@tags = Array(tags).collect { |tag| tag.to_s }
+ # making sure we have an array of String tags
+
+ @tags = Array(tags)
+ # any tag is OK
end
- #def trigger
- # @block.call @eid
- #end
+ #
+ # Returns true if this job sports the given tag
+ #
+ def has_tag? (tag)
+ @tags.include?(tag)
+ end
end
- class AtEntry < Entry
+ class AtJob < Job
- attr_accessor \
- :at
+ attr_accessor :at
- def initialize (at, at_id, &block)
- super(at_id, &block)
+ def initialize (at, at_id, tags, &block)
+ super(at_id, tags, &block)
@at = at
end
def trigger
- @block.call @eid, @at
+ @block.call @job_id, @at
end
end
- class EveryEntry < AtEntry
+ class EveryJob < AtJob
end
- class CronEntry < Entry
+ class CronJob < Job
- attr_accessor \
- :cron_line
+ attr_accessor :cron_line
- def initialize (cron_id, line, &block)
+ def initialize (cron_id, line, tags, &block)
- super(cron_id, &block)
+ super(cron_id, tags, &block)
if line.kind_of?(String)
@cron_line = CronLine.new(line)
elsif line.kind_of?(CronLine)
@cron_line = line
else
raise \
- "Cannot initialize a CronEntry " +
+ "Cannot initialize a CronJob " +
"with a param of class #{line.class}"
end
end
def matches? (time)
@cron_line.matches? time
end
def trigger
- @block.call @eid, @cron_line
+ @block.call @job_id, @cron_line
end
end
#
# A 'cron line' is a line in the sense of a crontab
@@ -791,11 +864,11 @@
return false if no_match?(time.hour, @hours)
return false if no_match?(time.day, @days)
return false if no_match?(time.month, @months)
return false if no_match?(time.wday, @weekdays)
- return true
+ true
end
#
# Returns an array of 5 arrays (minutes, hours, days, months,
# weekdays).
@@ -833,11 +906,11 @@
WDS.each_with_index do |day, index|
item = item.gsub(day, "#{index+1}")
end
- return parse_item(item, 1, 7)
+ parse_item(item, 1, 7)
end
def parse_item (item, min, max)
return nil \
@@ -850,11 +923,11 @@
i = Integer(item)
i = min if i < min
i = max if i > max
- return [ i ]
+ [ i ]
end
def parse_list (item, min, max)
items = item.split(",")
result = []
@@ -862,11 +935,11 @@
i = Integer(i)
i = min if i < min
i = max if i > max
result << i
end
- return result
+ result
end
def parse_range (item, min, max)
i = item.index("-")
j = item.index("/")
@@ -903,21 +976,21 @@
result << value
value = value + inc
break if value > iend
end
- return result
+ result
end
def no_match? (value, cron_values)
return false if not cron_values
cron_values.each do |v|
return false if value == v
end
- return true
+ true
end
end
end