lib/resque/scheduler.rb in sskirby-resque-scheduler-1.10.9 vs lib/resque/scheduler.rb in sskirby-resque-scheduler-1.10.13
- old
+ new
@@ -23,15 +23,16 @@
@@scheduled_jobs
end
# Schedule all jobs and continually look for delayed jobs (never returns)
def run
-
+ $0 = "resque-scheduler: Starting"
# trap signals
register_signal_handlers
# Load the schedule into rufus
+ procline "Loading Schedule"
load_schedule!
# Now start the scheduling part of the loop.
loop do
handle_delayed_items
@@ -66,46 +67,59 @@
@@scheduled_jobs = {}
Resque.schedule.each do |name, config|
load_schedule_job(name, config)
end
+ Resque.redis.del(:schedules_changed)
+ procline "Schedules Loaded"
end
# Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
def load_schedule_job(name, config)
# If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
# required for the jobs to be scheduled. If rails_env is missing, the
# job should be scheduled regardless of what ENV['RAILS_ENV'] is set
# to.
if config['rails_env'].nil? || rails_env_matches?(config)
log! "Scheduling #{name} "
- if !config['cron'].nil? && config['cron'].length > 0
- @@scheduled_jobs[name] = rufus_scheduler.cron config['cron'] do
- log! "queuing #{config['class']} (#{name})"
- enqueue_from_config(config)
+ interval_defined = false
+ interval_types = %w{cron every}
+ interval_types.each do |interval_type|
+ if !config[interval_type].nil? && config[interval_type].length > 0
+ begin
+ @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do
+ log! "queueing #{config['class']} (#{name})"
+ enqueue_from_config(config)
+ end
+ rescue Exception => e
+ log! "#{e.class.name}: #{e.message}"
+ end
+ interval_defined = true
+ break
end
- else
- log! "no cron found for #{config['class']} (#{name}) - skipping"
end
+ unless interval_defined
+ log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
+ end
end
end
# Returns true if the given schedule config hash matches the current
# ENV['RAILS_ENV']
def rails_env_matches?(config)
config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
end
# Handles queueing delayed items
- def handle_delayed_items
- item = nil
- begin
- if timestamp = Resque.next_delayed_timestamp
+ def handle_delayed_items(at_time = nil)
+ if timestamp = Resque.next_delayed_timestamp(at_time)
+ procline "Processing Delayed Items"
+ while !timestamp.nil?
enqueue_delayed_items_for_timestamp(timestamp)
+ timestamp = Resque.next_delayed_timestamp(at_time)
end
- # continue processing until there are no more ready timestamps
- end while !timestamp.nil?
+ end
end
# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
@@ -162,36 +176,30 @@
@@scheduled_jobs = {}
rufus_scheduler
end
def reload_schedule!
- log! "Reloading Schedule..."
+ procline "Reloading Schedule"
clear_schedule!
Resque.reload_schedule!
load_schedule!
end
def update_schedule
- schedule_from_redis = Resque.get_schedules
- if !schedule_from_redis.nil? && schedule_from_redis != Resque.schedule
- log "Updating schedule..."
- # unload schedules that no longer exist
- (Resque.schedule.keys - schedule_from_redis.keys).each do |name|
- unschedule_job(name)
- end
-
- # find changes and stop and reload or add new
- schedule_from_redis.each do |name, config|
- if (Resque.schedule[name].nil? || Resque.schedule[name].empty?) || (config != Resque.schedule[name])
- unschedule_job(name)
- load_schedule_job(name, config)
+ if Resque.redis.scard(:schedules_changed) > 0
+ procline "Updating schedule"
+ Resque.reload_schedule!
+ while schedule_name = Resque.redis.spop(:schedules_changed)
+ if Resque.schedule.keys.include?(schedule_name)
+ unschedule_job(schedule_name)
+ load_schedule_job(schedule_name, Resque.schedule[schedule_name])
+ else
+ unschedule_job(schedule_name)
end
end
-
- # load new schedule into Resque.schedule
- Resque.schedule = schedule_from_redis
end
+ procline "Schedules Loaded"
end
def unschedule_job(name)
if scheduled_jobs[name]
log "Removing schedule #{name}"
@@ -219,9 +227,14 @@
end
def log(msg)
# add "verbose" logic later
log!(msg) if verbose
+ end
+
+ def procline(string)
+ $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
+ log! $0
end
end
end