lib/say_when/scheduler.rb in say_when-0.2.6 vs lib/say_when/scheduler.rb in say_when-0.3.0

- old
+ new

@@ -5,13 +5,13 @@ DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple DEFAULT_STORAGE_STRATEGY = :memory @@scheduler = nil @@lock = nil - - attr_accessor :storage_strategy, :processor_class, :tick_length + attr_accessor :storage_strategy, :processor_class, :tick_length, :reset_acquired_length + attr_accessor :running # support for a singleton scheduler, but you are not restricted to this class << self @@ -38,25 +38,25 @@ end def start self.scheduler.start end - end def initialize self.tick_length = 1 + self.reset_acquired_length = 3600 end def processor if @processor.nil? @processor_class ||= DEFAULT_PROCESSOR_CLASS @processor = @processor_class.new(self) end @processor end - + def start logger.info "SayWhen::Scheduler starting" [$stdout, $stderr].each{|s| s.sync = true; s.flush} @@ -66,43 +66,70 @@ self.running = true logger.info "SayWhen::Scheduler running" job = nil + reset_next_at = Time.now while running begin time_now = Time.now - logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" - job = job_class.acquire_next(time_now) + + if reset_acquired_length > 0 && reset_next_at <= time_now + reset_next_at = time_now + reset_acquired_length + logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}" + job_class.reset_acquired(reset_acquired_length) + end + + begin + logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" + job = job_class.acquire_next(time_now) + rescue StandardError => ex + job_error("Failure to acquire job", job, ex) + job = nil + end + if job.nil? logger.debug "SayWhen:: no jobs to acquire, sleep" sleep(tick_length) - else + next + end + + begin logger.debug "SayWhen:: got a job: #{job.inspect}" # delegate processing the trigger to the processor self.processor.process(job) logger.debug "SayWhen:: job processed" - # this should update next fire at, and put back in list of scheduled jobs + # if successful, update next fire at, put back to waiting / ended job.fired(time_now) logger.debug "SayWhen:: job fired complete" + rescue StandardError=>ex + job_error("Failure to process", job, ex) end - rescue Interrupt - job_msg = job && " job:'#{job.inspect}'" - logger.error "\nSayWhen:: Interrupt! #{job_msg}" - exit - rescue StandardError=>ex - job_msg = job && " job:'#{job.inspect}'" - logger.error "SayWhen:: Failure to process#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" - job.release if job - rescue Exception=>ex - logger.error "SayWhen:: Exception in process#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" - exit + + rescue Interrupt => ex + job_error("Interrupt!", job, ex) + raise ex + rescue StandardError => ex + job_error("Error!", job, ex) + sleep(tick_length) + rescue Exception => ex + job_error("Exception!", job, ex) + raise ex end end + rescue Exception=>ex + logger.error "SayWhen::Scheduler stopping, error: #{ex.class.name}: #{ex.message}" + exit end logger.info "SayWhen::Scheduler stopped" + end + + def job_error(msg, job, ex) + job_msg = job && " job:'#{job.inspect}'" + logger.error "SayWhen::Scheduler #{msg}#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" + job.release if job end def stop logger.info "SayWhen::Scheduler stopping..." self.running = false