lib/resque/scheduler.rb in resque-scheduler-2.5.5 vs lib/resque/scheduler.rb in resque-scheduler-3.0.0

- old
+ new

@@ -1,105 +1,35 @@ +# vim:fileencoding=utf-8 + require 'rufus/scheduler' -require 'resque/scheduler_locking' -require 'resque_scheduler/logger_builder' +require_relative 'scheduler/configuration' +require_relative 'scheduler/locking' +require_relative 'scheduler/logger_builder' +require_relative 'scheduler/signal_handling' module Resque - class Scheduler - extend Resque::SchedulerLocking + module Scheduler + autoload :Cli, 'resque/scheduler/cli' + autoload :Extension, 'resque/scheduler/extension' + autoload :Util, 'resque/scheduler/util' + autoload :VERSION, 'resque/scheduler/version' - class << self - # Allows for block-style configuration - def configure - yield self - end + private - attr_writer :signal_queue + extend Resque::Scheduler::Locking + extend Resque::Scheduler::Configuration + extend Resque::Scheduler::SignalHandling - def signal_queue - @signal_queue ||= [] - end + public - # Used in `#load_schedule_job` - attr_writer :env - - def env - return @env if @env - @env ||= Rails.env if defined?(Rails) - @env ||= ENV['RAILS_ENV'] - @env - end - - # If true, logs more stuff... - attr_writer :verbose - - def verbose - @verbose ||= !!ENV['VERBOSE'] - end - - # If set, produces no output - attr_writer :mute - - def mute - @mute ||= !!ENV['MUTE'] - end - - # If set, will write messages to the file - attr_writer :logfile - - def logfile - @logfile ||= ENV['LOGFILE'] - end - - # Sets whether to log in 'text' or 'json' - attr_writer :logformat - - def logformat - @logformat ||= ENV['LOGFORMAT'] - end - - # If set, will try to update the schedule in the loop - attr_writer :dynamic - - def dynamic - @dynamic ||= !!ENV['DYNAMIC_SCHEDULE'] - end - - # If set, will append the app name to procline - attr_writer :app_name - - def app_name - @app_name ||= ENV['APP_NAME'] - end - - # Amount of time in seconds to sleep between polls of the delayed - # queue. Defaults to 5 - attr_writer :poll_sleep_amount - - def poll_sleep_amount - @poll_sleep_amount ||= - Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5')) - end - - attr_writer :logger - - def logger - @logger ||= ResqueScheduler::LoggerBuilder.new( - :mute => mute, - :verbose => verbose, - :log_dev => logfile, - :format => logformat - ).build - end - + class << self # the Rufus::Scheduler jobs that are scheduled - def scheduled_jobs - @@scheduled_jobs - end + attr_reader :scheduled_jobs # Schedule all jobs and continually look for delayed jobs (never returns) def run - $0 = "resque-scheduler: Starting" + procline 'Starting' # trap signals register_signal_handlers # Quote from the resque/worker. @@ -119,11 +49,11 @@ begin @th = Thread.current # Now start the scheduling part of the loop. loop do - if is_master? + if master? begin handle_delayed_items update_schedule if dynamic rescue Errno::EAGAIN, Errno::ECONNRESET => e log! e.message @@ -135,78 +65,56 @@ rescue Interrupt log 'Exiting' end end - # For all signals, set the shutdown flag and wait for current - # poll/enqueing to finish (should be almost instant). In the - # case of sleeping, exit immediately. - def register_signal_handlers - %w(INT TERM USR1 USR2 QUIT).each do |sig| - trap(sig) { signal_queue << sig } - end - end - - def handle_signals - loop do - sig = signal_queue.shift - break unless sig - log! "Got #{sig} signal" - case sig - when 'INT', 'TERM', 'QUIT' then shutdown - when 'USR1' then print_schedule - when 'USR2' then reload_schedule! - end - end - end - def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.all_jobs - scheduler_jobs.each do |k, v| + scheduler_jobs.each do |_k, v| log! "#{v.t}\t#{v.last}\t" end end end # Pulls the schedule from Resque.schedule and loads it into the # rufus scheduler instance def load_schedule! - procline "Loading Schedule" + procline 'Loading Schedule' # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic - log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? + log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty? - @@scheduled_jobs = {} + @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) - procline "Schedules Loaded" + procline 'Schedules Loaded' end # modify interval type value to value with options if options available def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options - args[1] = args[1].inject({}) do |m, i| + args[1] = args[1].reduce({}) do |m, i| key, value = i - m[(key.to_sym rescue key) || key] = value + m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value m end end args end - # Loads a job schedule into the Rufus::Scheduler and stores it in - # @@scheduled_jobs + # Loads a job schedule into the Rufus::Scheduler and stores it + # in @scheduled_jobs def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or # `env` is missing, the job should be scheduled regardless of the value # of `Resque::Scheduler.env`. @@ -214,37 +122,40 @@ configured_env = config['rails_env'] || config['env'] if configured_env.nil? || env_matches?(configured_env) log! "Scheduling #{name} " interval_defined = false - interval_types = %w{cron every} + interval_types = %w(cron every) interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 args = optionizate_interval_value(config[interval_type]) - @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do - if is_master? + job = rufus_scheduler.send(interval_type, *args) do + if master? log! "queueing #{config['class']} (#{name})" + Resque.last_enqueued_at(name, Time.now.to_s) handle_errors { enqueue_from_config(config) } end end + @scheduled_jobs[name] = job interval_defined = true break end end unless interval_defined - log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" + log! "no #{interval_types.join(' / ')} found for " \ + "#{config['class']} (#{name}) - skipping" end else - log "Skipping schedule of #{name} because configured " << - "env #{configured_env.inspect} does not match current " << + log "Skipping schedule of #{name} because configured " \ + "env #{configured_env.inspect} does not match current " \ "env #{env.inspect}" end end # Returns true if the given schedule config hash matches the current env def rails_env_matches?(config) - warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' << + warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \ 'Please use `Resque::Scheduler.env_matches?` instead.' config['rails_env'] && env && config['rails_env'].split(/[\s,]+/).include?(env) end @@ -254,78 +165,97 @@ env && configured_env.split(/[\s,]+/).include?(env) end # Handles queueing delayed items # at_time - Time to start scheduling items (default: now). - def handle_delayed_items(at_time=nil) - if timestamp = Resque.next_delayed_timestamp(at_time) - procline "Processing Delayed Items" - while !timestamp.nil? + def handle_delayed_items(at_time = nil) + timestamp = Resque.next_delayed_timestamp(at_time) + if timestamp + procline 'Processing Delayed Items' + until timestamp.nil? enqueue_delayed_items_for_timestamp(timestamp) timestamp = Resque.next_delayed_timestamp(at_time) end end end # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil - begin + loop do handle_shutdown do # Continually check that it is still the master - if is_master? && item = Resque.next_item_for_timestamp(timestamp) - log "queuing #{item['class']} [delayed]" - handle_errors { enqueue_from_config(item) } + if master? + item = Resque.next_item_for_timestamp(timestamp) + if item + log "queuing #{item['class']} [delayed]" + handle_errors { enqueue_from_config(item) } + end end end - # continue processing until there are no more ready items in this timestamp - end while !item.nil? + # continue processing until there are no more ready items in this + # timestamp + break if item.nil? + end end def handle_shutdown exit if @shutdown yield exit if @shutdown end def handle_errors - begin - yield - rescue Exception => e - log_error "#{e.class.name}: #{e.message}" - end + yield + rescue Exception => e + log_error "#{e.class.name}: #{e.message}" end # Enqueues a job based on a config hash def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] - klass = ResqueScheduler::Util.constantize(klass_name) rescue klass_name + begin + klass = Resque::Scheduler::Util.constantize(klass_name) + rescue NameError + klass = klass_name + end params = args.is_a?(Hash) ? [args] : Array(args) - queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) - # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status) - if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job') - # The custom job class API must offer a static "scheduled" method. If the custom - # job class can not be constantized (via a requeue call from the web perhaps), fall - # back to enqueing normally via Resque::Job.create. + queue = job_config['queue'] || + job_config[:queue] || + Resque.queue_from_class(klass) + # Support custom job classes like those that inherit from + # Resque::JobWithStatus (resque-status) + job_klass = job_config['custom_job_class'] + if job_klass && job_klass != 'Resque::Job' + # The custom job class API must offer a static "scheduled" method. If + # the custom job class can not be constantized (via a requeue call + # from the web perhaps), fall back to enqueing normally via + # Resque::Job.create. begin - ResqueScheduler::Util.constantize(job_klass).scheduled(queue, klass_name, *params) + Resque::Scheduler::Util.constantize(job_klass).scheduled( + queue, klass_name, *params + ) rescue NameError - # Note that the custom job class (job_config['custom_job_class']) is the one enqueued + # Note that the custom job class (job_config['custom_job_class']) + # is the one enqueued Resque::Job.create(queue, job_klass, *params) end else - # hack to avoid havoc for people shoving stuff into queues + # Hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in - # one app that schedules for another + # one app that schedules for another. if Class === klass - ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params) + Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( + klass, *params + ) - # If the class is a custom job class, call self#scheduled on it. This allows you to do things like - # Resque.enqueue_at(timestamp, CustomJobClass). Otherwise, pass off to Resque. + # If the class is a custom job class, call self#scheduled on it. + # This allows you to do things like Resque.enqueue_at(timestamp, + # CustomJobClass). Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass_name, *params) else Resque.enqueue_to(queue, klass, *params) end @@ -344,41 +274,42 @@ # Stops old rufus scheduler and creates a new one. Returns the new # rufus scheduler def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil - @@scheduled_jobs = {} + @scheduled_jobs = {} rufus_scheduler end def reload_schedule! - procline "Reloading Schedule" + procline 'Reloading Schedule' clear_schedule! load_schedule! end def update_schedule if Resque.redis.scard(:schedules_changed) > 0 - procline "Updating schedule" - Resque.reload_schedule! - while schedule_name = Resque.redis.spop(:schedules_changed) - if Resque.schedule.keys.include?(schedule_name) + procline 'Updating schedule' + loop do + schedule_name = Resque.redis.spop(:schedules_changed) + break unless schedule_name + if Resque.reload_schedule!.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end - procline "Schedules Loaded" + procline 'Schedules Loaded' end end def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule - @@scheduled_jobs.delete(name) + @scheduled_jobs.delete(name) end end # Sleeps and returns true def poll_sleep @@ -394,13 +325,15 @@ def poll_sleep_loop @sleeping = true start = Time.now loop do - break if (Time.now - start) >= poll_sleep_amount + elapsed_sleep = (Time.now - start) + remaining_sleep = poll_sleep_amount - elapsed_sleep + break if remaining_sleep <= 0 begin - sleep 0.01 + sleep(remaining_sleep) handle_signals rescue Interrupt if @shutdown Resque.clean_schedules release_master_lock! @@ -437,10 +370,21 @@ $0 = argv0 end private + attr_writer :logger + + def logger + @logger ||= Resque::Scheduler::LoggerBuilder.new( + quiet: quiet, + verbose: verbose, + log_dev: logfile, + format: logformat + ).build + end + def app_str app_name ? "[#{app_name}]" : '' end def env_str @@ -450,10 +394,10 @@ def build_procline(string) "#{internal_name}#{app_str}#{env_str}: #{string}" end def internal_name - "resque-scheduler-#{ResqueScheduler::VERSION}" + "resque-scheduler-#{Resque::Scheduler::VERSION}" end end end end