lib/resque/scheduler.rb in nogara-resque-scheduler-2.0.1 vs lib/resque/scheduler.rb in nogara-resque-scheduler-2.0.2

- old
+ new

@@ -1,18 +1,18 @@ require 'rufus/scheduler' require 'thwait' +require 'resque/scheduler_locking' module Resque class Scheduler extend Resque::Helpers + extend Resque::SchedulerLocking class << self - LOCK_TIMEOUT = 60 * 5 - # If true, logs more stuff... attr_accessor :verbose # If set, produces no output attr_accessor :mute @@ -37,47 +37,34 @@ def run $0 = "resque-scheduler: Starting" # trap signals register_signal_handlers - loop do - got_lock = can_lock_scheduler? - if got_lock == true + # Load the schedule into rufus + # If dynamic is set, load that schedule otherwise use normal load + if dynamic + reload_schedule! + else + load_schedule! + end - # Load the schedule into rufus - # If dynamic is set, load that schedule otherwise use normal load - if dynamic - reload_schedule! - else - load_schedule! + # Now start the scheduling part of the loop. + loop do + if is_master? + begin + handle_delayed_items + update_schedule if dynamic + rescue Errno::EAGAIN, Errno::ECONNRESET => e + warn e.message end - - first_time = false - - # Now start the scheduling part of the loop. - - 30.times do #30 * 5 seconds, it should be less than the timeout defined above - # loop do - begin - handle_delayed_items - update_schedule if dynamic - rescue Errno::EAGAIN, Errno::ECONNRESET => e - warn e.message - end - poll_sleep - end - - unlock_scheduler - clear_schedule! - - else - puts "Scheduler locked!!!" - sleep 5 end + poll_sleep end + # never gets here. end + # For all signals, set the shutdown flag and wait for current # poll/enqueing to finish (should be almost istant). In the # case of sleeping, exit immediately. def register_signal_handlers @@ -149,12 +136,14 @@ interval_types = %w{cron every} interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 args = optionizate_interval_value(config[interval_type]) @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do - log! "queueing #{config['class']} (#{name})" - handle_errors { enqueue_from_config(config) } + if is_master? + log! "queueing #{config['class']} (#{name})" + handle_errors { enqueue_from_config(config) } + end end interval_defined = true break end end @@ -185,30 +174,23 @@ # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) item = nil begin handle_shutdown do - if item = Resque.next_item_for_timestamp(timestamp) + # Continually check that it is still the master + if is_master? && item = Resque.next_item_for_timestamp(timestamp) log "queuing #{item['class']} [delayed]" handle_errors { enqueue_from_config(item) } end end # continue processing until there are no more ready items in this timestamp end while !item.nil? end def handle_shutdown - begin - unlock_scheduler if @shutdown - rescue - end exit if @shutdown yield - begin - unlock_scheduler if @shutdown - rescue - end exit if @shutdown end def handle_errors begin @@ -320,40 +302,9 @@ end def procline(string) log! string $0 = "resque-scheduler-#{ResqueScheduler::VERSION}: #{string}" - end - - def lock_timeout - Time.now.utc.to_i + LOCK_TIMEOUT + 1 - end - - def can_lock_scheduler? - #using logic from http://redis.io/commands/getset - got_lock = Resque.redis.setnx('scheduler:lock', lock_timeout) - puts "First get lock #{got_lock}" - unless got_lock - timestamp = Resque.redis.get('scheduler:lock').to_i - puts "Timestamp: #{timestamp}" - timestamp_now = Time.now.utc.to_i - puts "Timestamp Now: #{timestamp_now}" - if timestamp_now > timestamp - timestamp_old = Resque.redis.getset('scheduler:lock', lock_timeout).to_i - puts "Timestamp Old: #{timestamp_old}" - if timestamp_old < timestamp_now - puts "Got lock here" - got_lock = true - end - end - end - puts "Second get lock #{got_lock}" - got_lock - end - - def unlock_scheduler - puts "Unlocking scheduler lock" - Resque.redis.del('scheduler:lock') end end end