lib/say_when/scheduler.rb in say_when-0.2.6 vs lib/say_when/scheduler.rb in say_when-0.3.0
- old
+ new
@@ -5,13 +5,13 @@
DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple
DEFAULT_STORAGE_STRATEGY = :memory
@@scheduler = nil
@@lock = nil
-
- attr_accessor :storage_strategy, :processor_class, :tick_length
+ attr_accessor :storage_strategy, :processor_class, :tick_length, :reset_acquired_length
+
attr_accessor :running
# support for a singleton scheduler, but you are not restricted to this
class << self
@@ -38,25 +38,25 @@
end
def start
self.scheduler.start
end
-
end
def initialize
self.tick_length = 1
+ self.reset_acquired_length = 3600
end
def processor
if @processor.nil?
@processor_class ||= DEFAULT_PROCESSOR_CLASS
@processor = @processor_class.new(self)
end
@processor
end
-
+
def start
logger.info "SayWhen::Scheduler starting"
[$stdout, $stderr].each{|s| s.sync = true; s.flush}
@@ -66,43 +66,70 @@
self.running = true
logger.info "SayWhen::Scheduler running"
job = nil
+ reset_next_at = Time.now
while running
begin
time_now = Time.now
- logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
- job = job_class.acquire_next(time_now)
+
+ if reset_acquired_length > 0 && reset_next_at <= time_now
+ reset_next_at = time_now + reset_acquired_length
+ logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}"
+ job_class.reset_acquired(reset_acquired_length)
+ end
+
+ begin
+ logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
+ job = job_class.acquire_next(time_now)
+ rescue StandardError => ex
+ job_error("Failure to acquire job", job, ex)
+ job = nil
+ end
+
if job.nil?
logger.debug "SayWhen:: no jobs to acquire, sleep"
sleep(tick_length)
- else
+ next
+ end
+
+ begin
logger.debug "SayWhen:: got a job: #{job.inspect}"
# delegate processing the trigger to the processor
self.processor.process(job)
logger.debug "SayWhen:: job processed"
- # this should update next fire at, and put back in list of scheduled jobs
+ # if successful, update next fire at, put back to waiting / ended
job.fired(time_now)
logger.debug "SayWhen:: job fired complete"
+ rescue StandardError=>ex
+ job_error("Failure to process", job, ex)
end
- rescue Interrupt
- job_msg = job && " job:'#{job.inspect}'"
- logger.error "\nSayWhen:: Interrupt! #{job_msg}"
- exit
- rescue StandardError=>ex
- job_msg = job && " job:'#{job.inspect}'"
- logger.error "SayWhen:: Failure to process#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
- job.release if job
- rescue Exception=>ex
- logger.error "SayWhen:: Exception in process#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
- exit
+
+ rescue Interrupt => ex
+ job_error("Interrupt!", job, ex)
+ raise ex
+ rescue StandardError => ex
+ job_error("Error!", job, ex)
+ sleep(tick_length)
+ rescue Exception => ex
+ job_error("Exception!", job, ex)
+ raise ex
end
end
+ rescue Exception=>ex
+ logger.error "SayWhen::Scheduler stopping, error: #{ex.class.name}: #{ex.message}"
+ exit
end
logger.info "SayWhen::Scheduler stopped"
+ end
+
+ def job_error(msg, job, ex)
+ job_msg = job && " job:'#{job.inspect}'"
+ logger.error "SayWhen::Scheduler #{msg}#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
+ job.release if job
end
def stop
logger.info "SayWhen::Scheduler stopping..."
self.running = false