lib/resque/scheduler.rb in resque-scheduler-4.3.1 vs lib/resque/scheduler.rb in resque-scheduler-4.4.0

- old
+ new

@@ -11,10 +11,13 @@ module Scheduler autoload :Cli, 'resque/scheduler/cli' autoload :Extension, 'resque/scheduler/extension' autoload :Util, 'resque/scheduler/util' autoload :VERSION, 'resque/scheduler/version' + INTERMITTENT_ERRORS = [ + Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError, Redis::TimeoutError + ].freeze private extend Resque::Scheduler::Locking extend Resque::Scheduler::Configuration @@ -42,29 +45,33 @@ # Fix buffering so we can `rake resque:scheduler > scheduler.log` and # get output from the child in there. $stdout.sync = true $stderr.sync = true - # Load the schedule into rufus - # If dynamic is set, load that schedule otherwise use normal load - if dynamic - reload_schedule! - else - load_schedule! - end + was_master = nil begin @th = Thread.current # Now start the scheduling part of the loop. loop do begin - if master? + # Check on changes to master/child + @am_master = master? + if am_master != was_master + procline am_master ? 'Master scheduler' : 'Child scheduler' + + # Load schedule because changed + reload_schedule! + end + + if am_master handle_delayed_items update_schedule if dynamic end - rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e + was_master = am_master + rescue *INTERMITTENT_ERRORS => e log! e.message release_master_lock end poll_sleep end @@ -97,11 +104,11 @@ @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end - Resque.redis.del(:schedules_changed) + Resque.redis.del(:schedules_changed) if am_master && dynamic procline 'Schedules Loaded' end # modify interval type value to value with options if options available def optionizate_interval_value(value) @@ -200,11 +207,11 @@ def enqueue_delayed_items_for_timestamp(timestamp) item = nil loop do handle_shutdown do # Continually check that it is still the master - item = enqueue_next_item(timestamp) if master? + item = enqueue_next_item(timestamp) if am_master end # continue processing until there are no more ready items in this # timestamp break if item.nil? end @@ -418,14 +425,14 @@ end private def enqueue_recurring(name, config) - if master? + if am_master log! "queueing #{config['class']} (#{name})" - Resque.last_enqueued_at(name, Time.now.to_s) enqueue(config) + Resque.last_enqueued_at(name, Time.now.to_s) end end def app_str app_name ? "[#{app_name}]" : '' @@ -439,9 +446,14 @@ "#{internal_name}#{app_str}#{env_str}: #{string}" end def internal_name "resque-scheduler-#{Resque::Scheduler::VERSION}" + end + + def am_master + @am_master = master? unless defined?(@am_master) + @am_master end end end end