lib/resque/scheduler.rb in resque-scheduler-2.0.0.d vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.e
- old
+ new
@@ -9,31 +9,43 @@
class << self
# If true, logs more stuff...
attr_accessor :verbose
-
+
# If set, produces no output
attr_accessor :mute
-
+
# If set, will try to update the schulde in the loop
attr_accessor :dynamic
+ # Amount of time in seconds to sleep between polls of the delayed
+ # queue. Defaults to 5
+ attr_writer :poll_sleep_amount
+
# the Rufus::Scheduler jobs that are scheduled
def scheduled_jobs
@@scheduled_jobs
end
+
+ def poll_sleep_amount
+ @poll_sleep_amount ||= 5 # seconds
+ end
# Schedule all jobs and continually look for delayed jobs (never returns)
def run
$0 = "resque-scheduler: Starting"
# trap signals
register_signal_handlers
# Load the schedule into rufus
- procline "Loading Schedule"
- load_schedule!
+ # If dynamic is set, load that schedule otherwise use normal load
+ if dynamic
+ reload_schedule!
+ else
+ load_schedule!
+ end
# Now start the scheduling part of the loop.
loop do
begin
handle_delayed_items
@@ -51,37 +63,49 @@
# poll/enqueing to finish (should be almost istant). In the
# case of sleeping, exit immediately.
def register_signal_handlers
trap("TERM") { shutdown }
trap("INT") { shutdown }
-
+
begin
- trap('QUIT') { shutdown }
- trap('USR1') { kill_child }
+ trap('QUIT') { shutdown }
+ trap('USR1') { print_schedule }
trap('USR2') { reload_schedule! }
rescue ArgumentError
warn "Signals QUIT and USR1 and USR2 not supported."
end
end
+ def print_schedule
+ if rufus_scheduler
+ log! "Scheduling Info\tLast Run"
+ scheduler_jobs = rufus_scheduler.all_jobs
+ scheduler_jobs.each do |k, v|
+ log! "#{v.t}\t#{v.last}\t"
+ end
+ end
+ end
+
# Pulls the schedule from Resque.schedule and loads it into the
# rufus scheduler instance
def load_schedule!
+ procline "Loading Schedule"
+
# Need to load the schedule from redis for the first time if dynamic
- Resque.reload_schedule! if dynamic
-
+ Resque.reload_schedule! if dynamic
+
log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?
-
+
@@scheduled_jobs = {}
-
+
Resque.schedule.each do |name, config|
load_schedule_job(name, config)
end
Resque.redis.del(:schedules_changed)
procline "Schedules Loaded"
end
-
+
# Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
def load_schedule_job(name, config)
# If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
# required for the jobs to be scheduled. If rails_env is missing, the
# job should be scheduled regardless of what ENV['RAILS_ENV'] is set
@@ -126,11 +150,11 @@
enqueue_delayed_items_for_timestamp(timestamp)
timestamp = Resque.next_delayed_timestamp(at_time)
end
end
end
-
+
# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
begin
handle_shutdown do
@@ -150,13 +174,16 @@
end
# Enqueues a job based on a config hash
def enqueue_from_config(job_config)
args = job_config['args'] || job_config[:args]
+
klass_name = job_config['class'] || job_config[:class]
+ klass = constantize(klass_name) rescue klass_name
+
params = args.is_a?(Hash) ? [args] : Array(args)
- queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(constantize(klass_name))
+ queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass)
# Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status)
if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job')
# The custom job class API must offer a static "scheduled" method. If the custom
# job class can not be constantized (via a requeue call from the web perhaps), fall
# back to enqueing normally via Resque::Job.create.
@@ -165,11 +192,18 @@
rescue NameError
# Note that the custom job class (job_config['custom_job_class']) is the one enqueued
Resque::Job.create(queue, job_klass, *params)
end
else
- Resque::Job.create(queue, klass_name, *params)
+ # hack to avoid havoc for people shoving stuff into queues
+ # for non-existent classes (for example: running scheduler in
+ # one app that schedules for another
+ if Class === klass
+ Resque.enqueue(klass, *params)
+ else
+ Resque::Job.create(queue, klass, *params)
+ end
end
rescue
log! "Failed to enqueue #{klass_name}:\n #{$!}"
end
@@ -183,17 +217,17 @@
rufus_scheduler.stop
@rufus_scheduler = nil
@@scheduled_jobs = {}
rufus_scheduler
end
-
+
def reload_schedule!
procline "Reloading Schedule"
clear_schedule!
load_schedule!
end
-
+
def update_schedule
if Resque.redis.scard(:schedules_changed) > 0
procline "Updating schedule"
Resque.reload_schedule!
while schedule_name = Resque.redis.spop(:schedules_changed)
@@ -205,11 +239,11 @@
end
end
procline "Schedules Loaded"
end
end
-
+
def unschedule_job(name)
if scheduled_jobs[name]
log "Removing schedule #{name}"
scheduled_jobs[name].unschedule
@@scheduled_jobs.delete(name)
@@ -217,11 +251,11 @@
end
# Sleeps and returns true
def poll_sleep
@sleeping = true
- handle_shutdown { sleep 5 }
+ handle_shutdown { sleep poll_sleep_amount }
@sleeping = false
true
end
# Sets the shutdown flag, exits if sleeping
@@ -236,10 +270,10 @@
def log(msg)
# add "verbose" logic later
log!(msg) if verbose
end
-
+
def procline(string)
log! string
$0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
end