lib/resque/scheduler.rb in resque-scheduler-2.3.1 vs lib/resque/scheduler.rb in resque-scheduler-2.4.0
- old
+ new
@@ -1,48 +1,96 @@
require 'rufus/scheduler'
require 'resque/scheduler_locking'
require 'resque_scheduler/logger_builder'
module Resque
-
class Scheduler
-
extend Resque::SchedulerLocking
class << self
+ # Allows for block-style configuration
+ def configure
+ yield self
+ end
+ # Used in `#load_schedule_job`
+ attr_writer :env
+
+ def env
+ return @env if @env
+ @env ||= Rails.env if defined?(Rails)
+ @env ||= ENV['RAILS_ENV']
+ @env
+ end
+
# If true, logs more stuff...
- attr_accessor :verbose
+ attr_writer :verbose
+ def verbose
+ @verbose ||= !!ENV['VERBOSE']
+ end
+
# If set, produces no output
- attr_accessor :mute
+ attr_writer :mute
+ def mute
+ @mute ||= !!ENV['MUTE']
+ end
+
# If set, will write messages to the file
- attr_accessor :logfile
+ attr_writer :logfile
+ def logfile
+ @logfile ||= ENV['LOGFILE']
+ end
+
+ # Sets whether to log in 'text' or 'json'
+ attr_writer :logformat
+
+ def logformat
+ @logformat ||= ENV['LOGFORMAT']
+ end
+
# If set, will try to update the schedule in the loop
- attr_accessor :dynamic
+ attr_writer :dynamic
+ def dynamic
+ @dynamic ||= !!ENV['DYNAMIC_SCHEDULE']
+ end
+
+ # If set, will append the app name to procline
+ attr_writer :app_name
+
+ def app_name
+ @app_name ||= ENV['APP_NAME']
+ end
+
# Amount of time in seconds to sleep between polls of the delayed
# queue. Defaults to 5
attr_writer :poll_sleep_amount
+ def poll_sleep_amount
+ @poll_sleep_amount ||=
+ Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5'))
+ end
+
attr_writer :logger
+ def logger
+ @logger ||= ResqueScheduler::LoggerBuilder.new(
+ :mute => mute,
+ :verbose => verbose,
+ :log_dev => logfile,
+ :format => logformat
+ ).build
+ end
+
# the Rufus::Scheduler jobs that are scheduled
def scheduled_jobs
@@scheduled_jobs
end
- def poll_sleep_amount
- @poll_sleep_amount ||= 5 # seconds
- end
-
- def logger
- @logger ||= ResqueScheduler::LoggerBuilder.new(:mute => mute, :verbose => verbose, :log_dev => logfile).build
- end
-
# Schedule all jobs and continually look for delayed jobs (never returns)
def run
$0 = "resque-scheduler: Starting"
# trap signals
@@ -60,10 +108,12 @@
reload_schedule!
else
load_schedule!
end
+ @th = Thread.current
+
# Now start the scheduling part of the loop.
loop do
if is_master?
begin
handle_delayed_items
@@ -76,11 +126,10 @@
end
# never gets here.
end
-
# For all signals, set the shutdown flag and wait for current
# poll/enqueing to finish (should be almost istant). In the
# case of sleeping, exit immediately.
def register_signal_handlers
trap("TERM") { shutdown }
@@ -137,17 +186,21 @@
end
end
args
end
- # Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
+ # Loads a job schedule into the Rufus::Scheduler and stores it in
+ # @@scheduled_jobs
def load_schedule_job(name, config)
- # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
- # required for the jobs to be scheduled. If rails_env is missing, the
- # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
- # to.
- if config['rails_env'].nil? || rails_env_matches?(config)
+ # If `rails_env` or `env` is set in the config, load jobs only if they
+ # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or
+ # `env` is missing, the job should be scheduled regardless of the value
+ # of `Resque::Scheduler.env`.
+
+ configured_env = config['rails_env'] || config['env']
+
+ if configured_env.nil? || env_matches?(configured_env)
log! "Scheduling #{name} "
interval_defined = false
interval_types = %w{cron every}
interval_types.each do |interval_type|
if !config[interval_type].nil? && config[interval_type].length > 0
@@ -163,19 +216,31 @@
end
end
unless interval_defined
log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
end
+ else
+ log "Skipping schedule of #{name} because configured " <<
+ "env #{configured_env.inspect} does not match current " <<
+ "env #{env.inspect}"
end
end
- # Returns true if the given schedule config hash matches the current
- # ENV['RAILS_ENV']
+ # Returns true if the given schedule config hash matches the current env
def rails_env_matches?(config)
- config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
+ warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' <<
+ 'Please use `Resque::Scheduler.env_matches?` instead.'
+ config['rails_env'] && env &&
+ config['rails_env'].split(/[\s,]+/).include?(env)
end
+ # Returns true if the current env is non-nil and the configured env
+ # (which is a comma-split string) includes the current env.
+ def env_matches?(configured_env)
+ env && configured_env.split(/[\s,]+/).include?(env)
+ end
+
# Handles queueing delayed items
# at_time - Time to start scheduling items (default: now).
def handle_delayed_items(at_time=nil)
if timestamp = Resque.next_delayed_timestamp(at_time)
procline "Processing Delayed Items"
@@ -300,43 +365,67 @@
end
end
# Sleeps and returns true
def poll_sleep
- @sleeping = true
- handle_shutdown { sleep poll_sleep_amount }
- @sleeping = false
+ handle_shutdown do
+ begin
+ begin
+ @sleeping = true
+ sleep poll_sleep_amount
+ @sleeping = false
+ rescue Interrupt
+ if @shutdown
+ Resque.clean_schedules
+ release_master_lock!
+ end
+ end
+ ensure
+ @sleeping = false
+ end
+ end
true
end
# Sets the shutdown flag, clean schedules and exits if sleeping
def shutdown
+ return if @shutdown
@shutdown = true
-
- if @sleeping
- thread = Thread.new do
- Resque.clean_schedules
- release_master_lock!
- end
- thread.join
- exit
- end
+ log!('Shutting down')
+ @th.raise Interrupt if @sleeping
end
def log!(msg)
- logger.info msg
+ logger.info { msg }
end
def log(msg)
- logger.debug msg
+ logger.debug { msg }
end
def procline(string)
log! string
- $0 = "resque-scheduler-#{ResqueScheduler::VERSION}: #{string}"
+ argv0 = build_procline(string)
+ log "Setting procline #{argv0.inspect}"
+ $0 = argv0
end
- end
+ private
- end
+ def app_str
+ app_name ? "[#{app_name}]" : ''
+ end
+ def env_str
+ env ? "[#{env}]" : ''
+ end
+
+ def build_procline(string)
+ "#{internal_name}#{app_str}#{env_str}: #{string}"
+ end
+
+ def internal_name
+ "resque-scheduler-#{ResqueScheduler::VERSION}"
+ end
+ end
+ end
end