lib/say_when/scheduler.rb in say_when-0.4.1 vs lib/say_when/scheduler.rb in say_when-1.0.0

- old
+ new

@@ -1,52 +1,44 @@ -module SayWhen +# encoding: utf-8 +module SayWhen class Scheduler DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple DEFAULT_STORAGE_STRATEGY = :memory @@scheduler = nil - @@lock = nil + @@lock = Mutex.new - attr_accessor :storage_strategy, :processor_class, :tick_length, :reset_acquired_length, :reset_next_at + attr_accessor :storage_strategy, :processor_class, :tick_length attr_accessor :running # support for a singleton scheduler, but you are not restricted to this class << self - def scheduler - self.lock.synchronize { - if @@scheduler.nil? - @@scheduler = self.new - end - } + return @@scheduler if @@scheduler + @@lock.synchronize { @@scheduler = self.new if @@scheduler.nil? } @@scheduler end def configure - yield self.scheduler - self.scheduler + yield scheduler + scheduler end - def lock - @@lock ||= Mutex.new - end - def schedule(job) - self.scheduler.schedule(job) + scheduler.schedule(job) end def start - self.scheduler.start + scheduler.start end end def initialize - self.tick_length = 1 - self.reset_acquired_length = 3600 + self.tick_length = [ENV['SAY_WHEN_TICK_LENGTH'].to_i, 5].max end def processor if @processor.nil? @processor_class ||= DEFAULT_PROCESSOR_CLASS @@ -61,91 +53,58 @@ [$stdout, $stderr].each{|s| s.sync = true; s.flush} trap("TERM", "EXIT") begin + self.running = true logger.info "SayWhen::Scheduler running" + job = nil while running - process_jobs + begin + time_now = Time.now + logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" + job = job_class.acquire_next(time_now) + if job.nil? + logger.debug "SayWhen:: no jobs to acquire, sleep" + sleep(tick_length) + else + logger.debug "SayWhen:: got a job: #{job.inspect}" + # delegate processing the trigger to the processor + self.processor.process(job) + logger.debug "SayWhen:: job processed" + + # this should update next fire at, and put back in list of scheduled jobs + job.fired(time_now) + logger.debug "SayWhen:: job fired complete" + end + rescue StandardError => ex + job_msg = job && "job: #{job.inspect} " + logger.error "SayWhen:: Failure: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" + safe_release(job) + sleep(tick_length) + rescue Interrupt => ex + job_msg = job && "\n - interrupted job: #{job.inspect}\n" + logger.error "\nSayWhen:: Interrupt! #{ex.inspect}#{job_msg}" + safe_release(job) + exit + rescue Exception => ex + job_msg = job && "job: #{job.inspect} " + logger.error "SayWhen:: Exception: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" + safe_release(job) + exit + end end - rescue Exception => ex - logger.error "SayWhen::Scheduler stopping, error: #{ex.class.name}: #{ex.message}" - exit end logger.info "SayWhen::Scheduler stopped" end - def process_waiting_jobs(max_jobs=1000) - jobs_processed = 0 - while(jobs_processed < max_jobs) - if job = process_jobs - jobs_processed += 1 - else - break - end - end - return jobs_processed - end - - def process_jobs - job = nil - time_now = Time.now - self.reset_next_at ||= Time.now - - if reset_acquired_length > 0 && reset_next_at <= time_now - self.reset_next_at = time_now + reset_acquired_length - logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}" - job_class.reset_acquired(reset_acquired_length) - end - - begin - logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" - job = job_class.acquire_next(time_now) - rescue StandardError => ex - job_error("Failure to acquire job", job, ex) - job = nil - end - - if job.nil? - logger.debug "SayWhen:: no jobs to acquire, sleep" - sleep(tick_length) - return job - end - - begin - logger.debug "SayWhen:: got a job: #{job.inspect}" - # delegate processing the trigger to the processor - self.processor.process(job) - logger.debug "SayWhen:: job processed" - - # if successful, update next fire at, put back to waiting / ended - job.fired(time_now) - logger.debug "SayWhen:: job fired complete" - rescue StandardError => ex - job_error("Failure to process", job, ex) - end - - return job - - rescue StandardError => ex - job_error("Error!", job, ex) - sleep(tick_length) - return job - rescue Interrupt => ex - job_error("Interrupt!", job, ex) - raise ex - rescue Exception => ex - job_error("Exception!", job, ex) - raise ex - end - - def job_error(msg, job, ex) - job_msg = job && " job:'#{job.inspect}'" - logger.error "SayWhen::Scheduler #{msg}#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" + def safe_release(job) job.release if job + rescue + logger "Failed to release job: #{job.inspect}" rescue nil end def stop logger.info "SayWhen::Scheduler stopping..." self.running = false