lib/resque/scheduler.rb in nogara-resque-scheduler-2.0.1 vs lib/resque/scheduler.rb in nogara-resque-scheduler-2.0.2
- old
+ new
@@ -1,18 +1,18 @@
require 'rufus/scheduler'
require 'thwait'
+require 'resque/scheduler_locking'
module Resque
class Scheduler
extend Resque::Helpers
+ extend Resque::SchedulerLocking
class << self
- LOCK_TIMEOUT = 60 * 5
-
# If true, logs more stuff...
attr_accessor :verbose
# If set, produces no output
attr_accessor :mute
@@ -37,47 +37,34 @@
def run
$0 = "resque-scheduler: Starting"
# trap signals
register_signal_handlers
- loop do
- got_lock = can_lock_scheduler?
- if got_lock == true
+ # Load the schedule into rufus
+ # If dynamic is set, load that schedule otherwise use normal load
+ if dynamic
+ reload_schedule!
+ else
+ load_schedule!
+ end
- # Load the schedule into rufus
- # If dynamic is set, load that schedule otherwise use normal load
- if dynamic
- reload_schedule!
- else
- load_schedule!
+ # Now start the scheduling part of the loop.
+ loop do
+ if is_master?
+ begin
+ handle_delayed_items
+ update_schedule if dynamic
+ rescue Errno::EAGAIN, Errno::ECONNRESET => e
+ warn e.message
end
-
- first_time = false
-
- # Now start the scheduling part of the loop.
-
- 30.times do #30 * 5 seconds, it should be less than the timeout defined above
- # loop do
- begin
- handle_delayed_items
- update_schedule if dynamic
- rescue Errno::EAGAIN, Errno::ECONNRESET => e
- warn e.message
- end
- poll_sleep
- end
-
- unlock_scheduler
- clear_schedule!
-
- else
- puts "Scheduler locked!!!"
- sleep 5
end
+ poll_sleep
end
+
# never gets here.
end
+
# For all signals, set the shutdown flag and wait for current
# poll/enqueing to finish (should be almost istant). In the
# case of sleeping, exit immediately.
def register_signal_handlers
@@ -149,12 +136,14 @@
interval_types = %w{cron every}
interval_types.each do |interval_type|
if !config[interval_type].nil? && config[interval_type].length > 0
args = optionizate_interval_value(config[interval_type])
@@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do
- log! "queueing #{config['class']} (#{name})"
- handle_errors { enqueue_from_config(config) }
+ if is_master?
+ log! "queueing #{config['class']} (#{name})"
+ handle_errors { enqueue_from_config(config) }
+ end
end
interval_defined = true
break
end
end
@@ -185,30 +174,23 @@
# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
begin
handle_shutdown do
- if item = Resque.next_item_for_timestamp(timestamp)
+ # Continually check that it is still the master
+ if is_master? && item = Resque.next_item_for_timestamp(timestamp)
log "queuing #{item['class']} [delayed]"
handle_errors { enqueue_from_config(item) }
end
end
# continue processing until there are no more ready items in this timestamp
end while !item.nil?
end
def handle_shutdown
- begin
- unlock_scheduler if @shutdown
- rescue
- end
exit if @shutdown
yield
- begin
- unlock_scheduler if @shutdown
- rescue
- end
exit if @shutdown
end
def handle_errors
begin
@@ -320,40 +302,9 @@
end
def procline(string)
log! string
$0 = "resque-scheduler-#{ResqueScheduler::VERSION}: #{string}"
- end
-
- def lock_timeout
- Time.now.utc.to_i + LOCK_TIMEOUT + 1
- end
-
- def can_lock_scheduler?
- #using logic from http://redis.io/commands/getset
- got_lock = Resque.redis.setnx('scheduler:lock', lock_timeout)
- puts "First get lock #{got_lock}"
- unless got_lock
- timestamp = Resque.redis.get('scheduler:lock').to_i
- puts "Timestamp: #{timestamp}"
- timestamp_now = Time.now.utc.to_i
- puts "Timestamp Now: #{timestamp_now}"
- if timestamp_now > timestamp
- timestamp_old = Resque.redis.getset('scheduler:lock', lock_timeout).to_i
- puts "Timestamp Old: #{timestamp_old}"
- if timestamp_old < timestamp_now
- puts "Got lock here"
- got_lock = true
- end
- end
- end
- puts "Second get lock #{got_lock}"
- got_lock
- end
-
- def unlock_scheduler
- puts "Unlocking scheduler lock"
- Resque.redis.del('scheduler:lock')
end
end
end