lib/resque/scheduler.rb in resque-scheduler-1.9.10 vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.a
- old
+ new
@@ -9,26 +9,36 @@
class << self
# If true, logs more stuff...
attr_accessor :verbose
-
+
# If set, produces no output
attr_accessor :mute
+
+ # If set, will try to update the schulde in the loop
+ attr_accessor :dynamic
+
+ # the Rufus::Scheduler jobs that are scheduled
+ def scheduled_jobs
+ @@scheduled_jobs
+ end
# Schedule all jobs and continually look for delayed jobs (never returns)
def run
-
+ $0 = "resque-scheduler: Starting"
# trap signals
register_signal_handlers
# Load the schedule into rufus
+ procline "Loading Schedule"
load_schedule!
# Now start the scheduling part of the loop.
loop do
handle_delayed_items
+ update_schedule if dynamic
poll_sleep
end
# never gets here.
end
@@ -37,47 +47,61 @@
# poll/enqueing to finish (should be almost istant). In the
# case of sleeping, exit immediately.
def register_signal_handlers
trap("TERM") { shutdown }
trap("INT") { shutdown }
-
+
begin
trap('QUIT') { shutdown }
trap('USR1') { kill_child }
+ trap('USR2') { reload_schedule! }
rescue ArgumentError
- warn "Signals QUIT and USR1 not supported."
+ warn "Signals QUIT and USR1 and USR2 not supported."
end
end
# Pulls the schedule from Resque.schedule and loads it into the
# rufus scheduler instance
def load_schedule!
log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?
-
+
+ @@scheduled_jobs = {}
+
Resque.schedule.each do |name, config|
- # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
- # required for the jobs to be scheduled. If rails_env is missing, the
- # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
- # to.
- if config['rails_env'].nil? || rails_env_matches?(config)
- log! "Scheduling #{name} "
- interval_defined = false
- interval_types = %w{cron every}
- interval_types.each do |interval_type|
- if !config[interval_type].nil? && config[interval_type].length > 0
- rufus_scheduler.send(interval_type, config[interval_type]) do
+ load_schedule_job(name, config)
+ end
+ Resque.redis.del(:schedules_changed)
+ procline "Schedules Loaded"
+ end
+
+ # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
+ def load_schedule_job(name, config)
+ # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
+ # required for the jobs to be scheduled. If rails_env is missing, the
+ # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
+ # to.
+ if config['rails_env'].nil? || rails_env_matches?(config)
+ log! "Scheduling #{name} "
+ interval_defined = false
+ interval_types = %w{cron every}
+ interval_types.each do |interval_type|
+ if !config[interval_type].nil? && config[interval_type].length > 0
+ begin
+ @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do
log! "queueing #{config['class']} (#{name})"
enqueue_from_config(config)
end
- interval_defined = true
- break
+ rescue Exception => e
+ log! "#{e.class.name}: #{e.message}"
end
+ interval_defined = true
+ break
end
- unless interval_defined
- log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
- end
end
+ unless interval_defined
+ log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
+ end
end
end
# Returns true if the given schedule config hash matches the current
# ENV['RAILS_ENV']
@@ -86,37 +110,34 @@
end
# Handles queueing delayed items
# at_time - Time to start scheduling items (default: now).
def handle_delayed_items(at_time=nil)
- timestamp = nil
- begin
- if timestamp = Resque.next_delayed_timestamp(at_time)
+ item = nil
+ if timestamp = Resque.next_delayed_timestamp(at_time)
+ procline "Processing Delayed Items"
+ while !timestamp.nil?
enqueue_delayed_items_for_timestamp(timestamp)
+ timestamp = Resque.next_delayed_timestamp(at_time)
end
- # continue processing until there are no more ready timestamps
- end while !timestamp.nil?
+ end
end
-
+
# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
begin
handle_shutdown do
if item = Resque.next_item_for_timestamp(timestamp)
- begin
- log "queuing #{item['class']} [delayed]"
- queue = item['queue'] || Resque.queue_from_class(constantize(item['class']))
- # Support custom job classes like job with status
- if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job')
- # custom job classes not supporting the same API calls must implement the #schedule method
- constantize(job_klass).scheduled(queue, item['class'], *item['args'])
- else
- Resque::Job.create(queue, item['class'], *item['args'])
- end
- rescue
- log! "Failed to enqueue #{klass_name}:\n #{$!}"
+ log "queuing #{item['class']} [delayed]"
+ queue = item['queue'] || Resque.queue_from_class(constantize(item['class']))
+ # Support custom job classes like job with status
+ if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job')
+ # custom job classes not supporting the same API calls must implement the #schedule method
+ constantize(job_klass).scheduled(queue, item['class'], *item['args'])
+ else
+ Resque::Job.create(queue, item['class'], *item['args'])
end
end
end
# continue processing until there are no more ready items in this timestamp
end while !item.nil?
@@ -138,13 +159,11 @@
if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
# custom job classes not supporting the same API calls must implement the #schedule method
constantize(job_klass).scheduled(queue, klass_name, *params)
else
Resque::Job.create(queue, klass_name, *params)
- end
- rescue
- log! "Failed to enqueue #{klass_name}:\n #{$!}"
+ end
end
def rufus_scheduler
@rufus_scheduler ||= Rufus::Scheduler.start_new
end
@@ -152,12 +171,44 @@
# Stops old rufus scheduler and creates a new one. Returns the new
# rufus scheduler
def clear_schedule!
rufus_scheduler.stop
@rufus_scheduler = nil
+ @@scheduled_jobs = {}
rufus_scheduler
end
+
+ def reload_schedule!
+ procline "Reloading Schedule"
+ clear_schedule!
+ Resque.reload_schedule!
+ load_schedule!
+ end
+
+ def update_schedule
+ if Resque.redis.scard(:schedules_changed) > 0
+ procline "Updating schedule"
+ Resque.reload_schedule!
+ while schedule_name = Resque.redis.spop(:schedules_changed)
+ if Resque.schedule.keys.include?(schedule_name)
+ unschedule_job(schedule_name)
+ load_schedule_job(schedule_name, Resque.schedule[schedule_name])
+ else
+ unschedule_job(schedule_name)
+ end
+ end
+ procline "Schedules Loaded"
+ end
+ end
+
+ def unschedule_job(name)
+ if scheduled_jobs[name]
+ log "Removing schedule #{name}"
+ scheduled_jobs[name].unschedule
+ @@scheduled_jobs.delete(name)
+ end
+ end
# Sleeps and returns true
def poll_sleep
@sleeping = true
handle_shutdown { sleep 5 }
@@ -176,9 +227,14 @@
end
def log(msg)
# add "verbose" logic later
log!(msg) if verbose
+ end
+
+ def procline(string)
+ $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
+ log! $0
end
end
end