lib/resque/scheduler.rb in resque-scheduler-1.9.10 vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.a

- old
+ new

@@ -9,26 +9,36 @@ class << self # If true, logs more stuff... attr_accessor :verbose - + # If set, produces no output attr_accessor :mute + + # If set, will try to update the schulde in the loop + attr_accessor :dynamic + + # the Rufus::Scheduler jobs that are scheduled + def scheduled_jobs + @@scheduled_jobs + end # Schedule all jobs and continually look for delayed jobs (never returns) def run - + $0 = "resque-scheduler: Starting" # trap signals register_signal_handlers # Load the schedule into rufus + procline "Loading Schedule" load_schedule! # Now start the scheduling part of the loop. loop do handle_delayed_items + update_schedule if dynamic poll_sleep end # never gets here. end @@ -37,47 +47,61 @@ # poll/enqueing to finish (should be almost istant). In the # case of sleeping, exit immediately. def register_signal_handlers trap("TERM") { shutdown } trap("INT") { shutdown } - + begin trap('QUIT') { shutdown } trap('USR1') { kill_child } + trap('USR2') { reload_schedule! } rescue ArgumentError - warn "Signals QUIT and USR1 not supported." + warn "Signals QUIT and USR1 and USR2 not supported." end end # Pulls the schedule from Resque.schedule and loads it into the # rufus scheduler instance def load_schedule! log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? - + + @@scheduled_jobs = {} + Resque.schedule.each do |name, config| - # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as - # required for the jobs to be scheduled. If rails_env is missing, the - # job should be scheduled regardless of what ENV['RAILS_ENV'] is set - # to. - if config['rails_env'].nil? || rails_env_matches?(config) - log! "Scheduling #{name} " - interval_defined = false - interval_types = %w{cron every} - interval_types.each do |interval_type| - if !config[interval_type].nil? && config[interval_type].length > 0 - rufus_scheduler.send(interval_type, config[interval_type]) do + load_schedule_job(name, config) + end + Resque.redis.del(:schedules_changed) + procline "Schedules Loaded" + end + + # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs + def load_schedule_job(name, config) + # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as + # required for the jobs to be scheduled. If rails_env is missing, the + # job should be scheduled regardless of what ENV['RAILS_ENV'] is set + # to. + if config['rails_env'].nil? || rails_env_matches?(config) + log! "Scheduling #{name} " + interval_defined = false + interval_types = %w{cron every} + interval_types.each do |interval_type| + if !config[interval_type].nil? && config[interval_type].length > 0 + begin + @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do log! "queueing #{config['class']} (#{name})" enqueue_from_config(config) end - interval_defined = true - break + rescue Exception => e + log! "#{e.class.name}: #{e.message}" end + interval_defined = true + break end - unless interval_defined - log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" - end end + unless interval_defined + log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" + end end end # Returns true if the given schedule config hash matches the current # ENV['RAILS_ENV'] @@ -86,37 +110,34 @@ end # Handles queueing delayed items # at_time - Time to start scheduling items (default: now). def handle_delayed_items(at_time=nil) - timestamp = nil - begin - if timestamp = Resque.next_delayed_timestamp(at_time) + item = nil + if timestamp = Resque.next_delayed_timestamp(at_time) + procline "Processing Delayed Items" + while !timestamp.nil? enqueue_delayed_items_for_timestamp(timestamp) + timestamp = Resque.next_delayed_timestamp(at_time) end - # continue processing until there are no more ready timestamps - end while !timestamp.nil? + end end - + # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil begin handle_shutdown do if item = Resque.next_item_for_timestamp(timestamp) - begin - log "queuing #{item['class']} [delayed]" - queue = item['queue'] || Resque.queue_from_class(constantize(item['class'])) - # Support custom job classes like job with status - if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job') - # custom job classes not supporting the same API calls must implement the #schedule method - constantize(job_klass).scheduled(queue, item['class'], *item['args']) - else - Resque::Job.create(queue, item['class'], *item['args']) - end - rescue - log! "Failed to enqueue #{klass_name}:\n #{$!}" + log "queuing #{item['class']} [delayed]" + queue = item['queue'] || Resque.queue_from_class(constantize(item['class'])) + # Support custom job classes like job with status + if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job') + # custom job classes not supporting the same API calls must implement the #schedule method + constantize(job_klass).scheduled(queue, item['class'], *item['args']) + else + Resque::Job.create(queue, item['class'], *item['args']) end end end # continue processing until there are no more ready items in this timestamp end while !item.nil? @@ -138,13 +159,11 @@ if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job') # custom job classes not supporting the same API calls must implement the #schedule method constantize(job_klass).scheduled(queue, klass_name, *params) else Resque::Job.create(queue, klass_name, *params) - end - rescue - log! "Failed to enqueue #{klass_name}:\n #{$!}" + end end def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end @@ -152,12 +171,44 @@ # Stops old rufus scheduler and creates a new one. Returns the new # rufus scheduler def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil + @@scheduled_jobs = {} rufus_scheduler end + + def reload_schedule! + procline "Reloading Schedule" + clear_schedule! + Resque.reload_schedule! + load_schedule! + end + + def update_schedule + if Resque.redis.scard(:schedules_changed) > 0 + procline "Updating schedule" + Resque.reload_schedule! + while schedule_name = Resque.redis.spop(:schedules_changed) + if Resque.schedule.keys.include?(schedule_name) + unschedule_job(schedule_name) + load_schedule_job(schedule_name, Resque.schedule[schedule_name]) + else + unschedule_job(schedule_name) + end + end + procline "Schedules Loaded" + end + end + + def unschedule_job(name) + if scheduled_jobs[name] + log "Removing schedule #{name}" + scheduled_jobs[name].unschedule + @@scheduled_jobs.delete(name) + end + end # Sleeps and returns true def poll_sleep @sleeping = true handle_shutdown { sleep 5 } @@ -176,9 +227,14 @@ end def log(msg) # add "verbose" logic later log!(msg) if verbose + end + + def procline(string) + $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}" + log! $0 end end end