lib/resque/scheduler.rb in resque-scheduler-2.0.0.d vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.e

- old
+ new

@@ -9,31 +9,43 @@ class << self # If true, logs more stuff... attr_accessor :verbose - + # If set, produces no output attr_accessor :mute - + # If set, will try to update the schulde in the loop attr_accessor :dynamic + # Amount of time in seconds to sleep between polls of the delayed + # queue. Defaults to 5 + attr_writer :poll_sleep_amount + # the Rufus::Scheduler jobs that are scheduled def scheduled_jobs @@scheduled_jobs end + + def poll_sleep_amount + @poll_sleep_amount ||= 5 # seconds + end # Schedule all jobs and continually look for delayed jobs (never returns) def run $0 = "resque-scheduler: Starting" # trap signals register_signal_handlers # Load the schedule into rufus - procline "Loading Schedule" - load_schedule! + # If dynamic is set, load that schedule otherwise use normal load + if dynamic + reload_schedule! + else + load_schedule! + end # Now start the scheduling part of the loop. loop do begin handle_delayed_items @@ -51,37 +63,49 @@ # poll/enqueing to finish (should be almost istant). In the # case of sleeping, exit immediately. def register_signal_handlers trap("TERM") { shutdown } trap("INT") { shutdown } - + begin - trap('QUIT') { shutdown } - trap('USR1') { kill_child } + trap('QUIT') { shutdown } + trap('USR1') { print_schedule } trap('USR2') { reload_schedule! } rescue ArgumentError warn "Signals QUIT and USR1 and USR2 not supported." end end + def print_schedule + if rufus_scheduler + log! "Scheduling Info\tLast Run" + scheduler_jobs = rufus_scheduler.all_jobs + scheduler_jobs.each do |k, v| + log! "#{v.t}\t#{v.last}\t" + end + end + end + # Pulls the schedule from Resque.schedule and loads it into the # rufus scheduler instance def load_schedule! + procline "Loading Schedule" + # Need to load the schedule from redis for the first time if dynamic - Resque.reload_schedule! if dynamic - + Resque.reload_schedule! if dynamic + log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? - + @@scheduled_jobs = {} - + Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) procline "Schedules Loaded" end - + # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set @@ -126,11 +150,11 @@ enqueue_delayed_items_for_timestamp(timestamp) timestamp = Resque.next_delayed_timestamp(at_time) end end end - + # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil begin handle_shutdown do @@ -150,13 +174,16 @@ end # Enqueues a job based on a config hash def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] + klass_name = job_config['class'] || job_config[:class] + klass = constantize(klass_name) rescue klass_name + params = args.is_a?(Hash) ? [args] : Array(args) - queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(constantize(klass_name)) + queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status) if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job') # The custom job class API must offer a static "scheduled" method. If the custom # job class can not be constantized (via a requeue call from the web perhaps), fall # back to enqueing normally via Resque::Job.create. @@ -165,11 +192,18 @@ rescue NameError # Note that the custom job class (job_config['custom_job_class']) is the one enqueued Resque::Job.create(queue, job_klass, *params) end else - Resque::Job.create(queue, klass_name, *params) + # hack to avoid havoc for people shoving stuff into queues + # for non-existent classes (for example: running scheduler in + # one app that schedules for another + if Class === klass + Resque.enqueue(klass, *params) + else + Resque::Job.create(queue, klass, *params) + end end rescue log! "Failed to enqueue #{klass_name}:\n #{$!}" end @@ -183,17 +217,17 @@ rufus_scheduler.stop @rufus_scheduler = nil @@scheduled_jobs = {} rufus_scheduler end - + def reload_schedule! procline "Reloading Schedule" clear_schedule! load_schedule! end - + def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline "Updating schedule" Resque.reload_schedule! while schedule_name = Resque.redis.spop(:schedules_changed) @@ -205,11 +239,11 @@ end end procline "Schedules Loaded" end end - + def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @@scheduled_jobs.delete(name) @@ -217,11 +251,11 @@ end # Sleeps and returns true def poll_sleep @sleeping = true - handle_shutdown { sleep 5 } + handle_shutdown { sleep poll_sleep_amount } @sleeping = false true end # Sets the shutdown flag, exits if sleeping @@ -236,10 +270,10 @@ def log(msg) # add "verbose" logic later log!(msg) if verbose end - + def procline(string) log! string $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}" end