lib/resque/scheduler.rb in resque-scheduler-4.3.1 vs lib/resque/scheduler.rb in resque-scheduler-4.4.0
- old
+ new
@@ -11,10 +11,13 @@
module Scheduler
autoload :Cli, 'resque/scheduler/cli'
autoload :Extension, 'resque/scheduler/extension'
autoload :Util, 'resque/scheduler/util'
autoload :VERSION, 'resque/scheduler/version'
+ INTERMITTENT_ERRORS = [
+ Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError, Redis::TimeoutError
+ ].freeze
private
extend Resque::Scheduler::Locking
extend Resque::Scheduler::Configuration
@@ -42,29 +45,33 @@
# Fix buffering so we can `rake resque:scheduler > scheduler.log` and
# get output from the child in there.
$stdout.sync = true
$stderr.sync = true
- # Load the schedule into rufus
- # If dynamic is set, load that schedule otherwise use normal load
- if dynamic
- reload_schedule!
- else
- load_schedule!
- end
+ was_master = nil
begin
@th = Thread.current
# Now start the scheduling part of the loop.
loop do
begin
- if master?
+ # Check on changes to master/child
+ @am_master = master?
+ if am_master != was_master
+ procline am_master ? 'Master scheduler' : 'Child scheduler'
+
+ # Load schedule because changed
+ reload_schedule!
+ end
+
+ if am_master
handle_delayed_items
update_schedule if dynamic
end
- rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e
+ was_master = am_master
+ rescue *INTERMITTENT_ERRORS => e
log! e.message
release_master_lock
end
poll_sleep
end
@@ -97,11 +104,11 @@
@scheduled_jobs = {}
Resque.schedule.each do |name, config|
load_schedule_job(name, config)
end
- Resque.redis.del(:schedules_changed)
+ Resque.redis.del(:schedules_changed) if am_master && dynamic
procline 'Schedules Loaded'
end
# modify interval type value to value with options if options available
def optionizate_interval_value(value)
@@ -200,11 +207,11 @@
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
loop do
handle_shutdown do
# Continually check that it is still the master
- item = enqueue_next_item(timestamp) if master?
+ item = enqueue_next_item(timestamp) if am_master
end
# continue processing until there are no more ready items in this
# timestamp
break if item.nil?
end
@@ -418,14 +425,14 @@
end
private
def enqueue_recurring(name, config)
- if master?
+ if am_master
log! "queueing #{config['class']} (#{name})"
- Resque.last_enqueued_at(name, Time.now.to_s)
enqueue(config)
+ Resque.last_enqueued_at(name, Time.now.to_s)
end
end
def app_str
app_name ? "[#{app_name}]" : ''
@@ -439,9 +446,14 @@
"#{internal_name}#{app_str}#{env_str}: #{string}"
end
def internal_name
"resque-scheduler-#{Resque::Scheduler::VERSION}"
+ end
+
+ def am_master
+ @am_master = master? unless defined?(@am_master)
+ @am_master
end
end
end
end