# vim:fileencoding=utf-8 require 'rufus/scheduler' require_relative 'scheduler/configuration' require_relative 'scheduler/locking' require_relative 'scheduler/logger_builder' require_relative 'scheduler/signal_handling' require_relative 'scheduler/failure_handler' module Resque module Scheduler autoload :Cli, 'resque/scheduler/cli' autoload :Extension, 'resque/scheduler/extension' autoload :Util, 'resque/scheduler/util' autoload :VERSION, 'resque/scheduler/version' INTERMITTENT_ERRORS = [ Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError, Redis::TimeoutError ].freeze private extend Resque::Scheduler::Locking extend Resque::Scheduler::Configuration extend Resque::Scheduler::SignalHandling public class << self attr_writer :logger # the Rufus::Scheduler jobs that are scheduled attr_reader :scheduled_jobs # allow user to set an additional failure handler attr_writer :failure_handler # Schedule all jobs and continually look for delayed jobs (never returns) def run procline 'Starting' # trap signals register_signal_handlers # Quote from the resque/worker. # Fix buffering so we can `rake resque:scheduler > scheduler.log` and # get output from the child in there. $stdout.sync = true $stderr.sync = true was_master = nil begin @th = Thread.current # Now start the scheduling part of the loop. loop do begin # Check on changes to master/child @am_master = master? if am_master != was_master procline am_master ? 'Master scheduler' : 'Child scheduler' # Load schedule because changed reload_schedule! end if am_master handle_delayed_items update_schedule if dynamic end was_master = am_master rescue *INTERMITTENT_ERRORS => e log! e.message release_master_lock end poll_sleep end rescue Interrupt log 'Exiting' end end def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.jobs scheduler_jobs.each do |_k, v| log! "#{v.t}\t#{v.last}\t" end end end # Pulls the schedule from Resque.schedule and loads it into the # rufus scheduler instance def load_schedule! procline 'Loading Schedule' # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty? @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) if am_master && dynamic procline 'Schedules Loaded' end # modify interval type value to value with options if options available def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[2] = args[1].reduce({}) do |m, i| key, value = i m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value m end args[2][:job] = true args[1] = nil end args end # Loads a job schedule into the Rufus::Scheduler and stores it # in @scheduled_jobs def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or # `env` is missing, the job should be scheduled regardless of the value # of `Resque::Scheduler.env`. configured_env = config['rails_env'] || config['env'] if configured_env.nil? || env_matches?(configured_env) log! "Scheduling #{name} " interval_defined = false interval_types = %w(cron every) interval_types.each do |interval_type| next unless !config[interval_type].nil? && !config[interval_type].empty? args = optionizate_interval_value(config[interval_type]) args = [args, nil, job: true] if args.is_a?(::String) job = rufus_scheduler.send(interval_type, *args) do enqueue_recurring(name, config) end @scheduled_jobs[name] = job interval_defined = true break end unless interval_defined log! "no #{interval_types.join(' / ')} found for " \ "#{config['class']} (#{name}) - skipping" end else log "Skipping schedule of #{name} because configured " \ "env #{configured_env.inspect} does not match current " \ "env #{env.inspect}" end end # Returns true if the given schedule config hash matches the current env def rails_env_matches?(config) warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \ 'Please use `Resque::Scheduler.env_matches?` instead.' config['rails_env'] && env && config['rails_env'].split(/[\s,]+/).include?(env) end # Returns true if the current env is non-nil and the configured env # (which is a comma-split string) includes the current env. def env_matches?(configured_env) env && configured_env.split(/[\s,]+/).include?(env) end # Handles queueing delayed items # at_time - Time to start scheduling items (default: now). def handle_delayed_items(at_time = nil) timestamp = Resque.next_delayed_timestamp(at_time) if timestamp procline 'Processing Delayed Items' until timestamp.nil? enqueue_delayed_items_for_timestamp(timestamp) timestamp = Resque.next_delayed_timestamp(at_time) end end end def enqueue_next_item(timestamp) item = Resque.next_item_for_timestamp(timestamp) if item log "queuing #{item['class']} [delayed]" enqueue(item) end item end # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil loop do handle_shutdown do # Continually check that it is still the master item = enqueue_next_item(timestamp) if am_master end # continue processing until there are no more ready items in this # timestamp break if item.nil? end end def enqueue(config) enqueue_from_config(config) rescue => e Resque::Scheduler.failure_handler.on_enqueue_failure(config, e) end def handle_shutdown exit if @shutdown yield exit if @shutdown end # Enqueues a job based on a config hash def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] begin klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end params = args.is_a?(Hash) ? [args] : Array(args) queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from # Resque::JobWithStatus (resque-status) job_klass = job_config['custom_job_class'] if job_klass && job_klass != 'Resque::Job' # The custom job class API must offer a static "scheduled" method. If # the custom job class can not be constantized (via a requeue call # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError # Note that the custom job class (job_config['custom_job_class']) # is the one enqueued Resque::Job.create(queue, job_klass, *params) end else # Hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) # If the class is a custom job class, call self#scheduled on it. # This allows you to do things like Resque.enqueue_at(timestamp, # CustomJobClass). Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass_name, *params) else Resque.enqueue_to(queue, klass, *params) end else # This will not run the before_hooks in rescue, but will at least # queue the job. Resque::Job.create(queue, klass, *params) end end end def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.new end # Stops old rufus scheduler and creates a new one. Returns the new # rufus scheduler def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @scheduled_jobs = {} rufus_scheduler end def reload_schedule! procline 'Reloading Schedule' clear_schedule! load_schedule! end def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline 'Updating schedule' loop do schedule_name = Resque.redis.spop(:schedules_changed) break unless schedule_name Resque.reload_schedule! if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline 'Schedules Loaded' end end def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @scheduled_jobs.delete(name) end end # Sleeps and returns true def poll_sleep handle_shutdown do begin poll_sleep_loop ensure @sleeping = false end end true end def poll_sleep_loop @sleeping = true if poll_sleep_amount > 0 start = Time.now loop do elapsed_sleep = (Time.now - start) remaining_sleep = poll_sleep_amount - elapsed_sleep @do_break = false if remaining_sleep <= 0 @do_break = true else @do_break = handle_signals_with_operation do sleep(remaining_sleep) end end break if @do_break end else handle_signals_with_operation end end def handle_signals_with_operation yield if block_given? handle_signals false rescue Interrupt before_shutdown if @shutdown true end def stop_rufus_scheduler rufus_scheduler.shutdown(:wait) rufus_scheduler.join end def before_shutdown stop_rufus_scheduler release_master_lock end # Sets the shutdown flag, clean schedules and exits if sleeping def shutdown return if @shutdown @shutdown = true log!('Shutting down') @th.raise Interrupt if @sleeping end def log!(msg) logger.info { msg } end def log_error(msg) logger.error { msg } end def log(msg) logger.debug { msg } end def procline(string) log! string argv0 = build_procline(string) log "Setting procline #{argv0.inspect}" $0 = argv0 end def failure_handler @failure_handler ||= Resque::Scheduler::FailureHandler end def logger @logger ||= Resque::Scheduler::LoggerBuilder.new( quiet: quiet, verbose: verbose, log_dev: logfile, format: logformat ).build end private def enqueue_recurring(name, config) if am_master log! "queueing #{config['class']} (#{name})" enqueue(config) Resque.last_enqueued_at(name, Time.now.to_s) end end def app_str app_name ? "[#{app_name}]" : '' end def env_str env ? "[#{env}]" : '' end def build_procline(string) "#{internal_name}#{app_str}#{env_str}: #{string}" end def internal_name "resque-scheduler-#{Resque::Scheduler::VERSION}" end def am_master @am_master = master? unless defined?(@am_master) @am_master end end end end