lib/resque/scheduler.rb in sskirby-resque-scheduler-1.10.9 vs lib/resque/scheduler.rb in sskirby-resque-scheduler-1.10.13

- old
+ new

@@ -23,15 +23,16 @@ @@scheduled_jobs end # Schedule all jobs and continually look for delayed jobs (never returns) def run - + $0 = "resque-scheduler: Starting" # trap signals register_signal_handlers # Load the schedule into rufus + procline "Loading Schedule" load_schedule! # Now start the scheduling part of the loop. loop do handle_delayed_items @@ -66,46 +67,59 @@ @@scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end + Resque.redis.del(:schedules_changed) + procline "Schedules Loaded" end # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) log! "Scheduling #{name} " - if !config['cron'].nil? && config['cron'].length > 0 - @@scheduled_jobs[name] = rufus_scheduler.cron config['cron'] do - log! "queuing #{config['class']} (#{name})" - enqueue_from_config(config) + interval_defined = false + interval_types = %w{cron every} + interval_types.each do |interval_type| + if !config[interval_type].nil? && config[interval_type].length > 0 + begin + @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do + log! "queueing #{config['class']} (#{name})" + enqueue_from_config(config) + end + rescue Exception => e + log! "#{e.class.name}: #{e.message}" + end + interval_defined = true + break end - else - log! "no cron found for #{config['class']} (#{name}) - skipping" end + unless interval_defined + log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" + end end end # Returns true if the given schedule config hash matches the current # ENV['RAILS_ENV'] def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV']) end # Handles queueing delayed items - def handle_delayed_items - item = nil - begin - if timestamp = Resque.next_delayed_timestamp + def handle_delayed_items(at_time = nil) + if timestamp = Resque.next_delayed_timestamp(at_time) + procline "Processing Delayed Items" + while !timestamp.nil? enqueue_delayed_items_for_timestamp(timestamp) + timestamp = Resque.next_delayed_timestamp(at_time) end - # continue processing until there are no more ready timestamps - end while !timestamp.nil? + end end # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil @@ -162,36 +176,30 @@ @@scheduled_jobs = {} rufus_scheduler end def reload_schedule! - log! "Reloading Schedule..." + procline "Reloading Schedule" clear_schedule! Resque.reload_schedule! load_schedule! end def update_schedule - schedule_from_redis = Resque.get_schedules - if !schedule_from_redis.nil? && schedule_from_redis != Resque.schedule - log "Updating schedule..." - # unload schedules that no longer exist - (Resque.schedule.keys - schedule_from_redis.keys).each do |name| - unschedule_job(name) - end - - # find changes and stop and reload or add new - schedule_from_redis.each do |name, config| - if (Resque.schedule[name].nil? || Resque.schedule[name].empty?) || (config != Resque.schedule[name]) - unschedule_job(name) - load_schedule_job(name, config) + if Resque.redis.scard(:schedules_changed) > 0 + procline "Updating schedule" + Resque.reload_schedule! + while schedule_name = Resque.redis.spop(:schedules_changed) + if Resque.schedule.keys.include?(schedule_name) + unschedule_job(schedule_name) + load_schedule_job(schedule_name, Resque.schedule[schedule_name]) + else + unschedule_job(schedule_name) end end - - # load new schedule into Resque.schedule - Resque.schedule = schedule_from_redis end + procline "Schedules Loaded" end def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" @@ -219,9 +227,14 @@ end def log(msg) # add "verbose" logic later log!(msg) if verbose + end + + def procline(string) + $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}" + log! $0 end end end