lib/say_when/scheduler.rb in say_when-0.4.1 vs lib/say_when/scheduler.rb in say_when-1.0.0
- old
+ new
@@ -1,52 +1,44 @@
-module SayWhen
+# encoding: utf-8
+module SayWhen
class Scheduler
DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple
DEFAULT_STORAGE_STRATEGY = :memory
@@scheduler = nil
- @@lock = nil
+ @@lock = Mutex.new
- attr_accessor :storage_strategy, :processor_class, :tick_length, :reset_acquired_length, :reset_next_at
+ attr_accessor :storage_strategy, :processor_class, :tick_length
attr_accessor :running
# support for a singleton scheduler, but you are not restricted to this
class << self
-
def scheduler
- self.lock.synchronize {
- if @@scheduler.nil?
- @@scheduler = self.new
- end
- }
+ return @@scheduler if @@scheduler
+ @@lock.synchronize { @@scheduler = self.new if @@scheduler.nil? }
@@scheduler
end
def configure
- yield self.scheduler
- self.scheduler
+ yield scheduler
+ scheduler
end
- def lock
- @@lock ||= Mutex.new
- end
-
def schedule(job)
- self.scheduler.schedule(job)
+ scheduler.schedule(job)
end
def start
- self.scheduler.start
+ scheduler.start
end
end
def initialize
- self.tick_length = 1
- self.reset_acquired_length = 3600
+ self.tick_length = [ENV['SAY_WHEN_TICK_LENGTH'].to_i, 5].max
end
def processor
if @processor.nil?
@processor_class ||= DEFAULT_PROCESSOR_CLASS
@@ -61,91 +53,58 @@
[$stdout, $stderr].each{|s| s.sync = true; s.flush}
trap("TERM", "EXIT")
begin
+
self.running = true
logger.info "SayWhen::Scheduler running"
+ job = nil
while running
- process_jobs
+ begin
+ time_now = Time.now
+ logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
+ job = job_class.acquire_next(time_now)
+ if job.nil?
+ logger.debug "SayWhen:: no jobs to acquire, sleep"
+ sleep(tick_length)
+ else
+ logger.debug "SayWhen:: got a job: #{job.inspect}"
+ # delegate processing the trigger to the processor
+ self.processor.process(job)
+ logger.debug "SayWhen:: job processed"
+
+ # this should update next fire at, and put back in list of scheduled jobs
+ job.fired(time_now)
+ logger.debug "SayWhen:: job fired complete"
+ end
+ rescue StandardError => ex
+ job_msg = job && "job: #{job.inspect} "
+ logger.error "SayWhen:: Failure: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
+ safe_release(job)
+ sleep(tick_length)
+ rescue Interrupt => ex
+ job_msg = job && "\n - interrupted job: #{job.inspect}\n"
+ logger.error "\nSayWhen:: Interrupt! #{ex.inspect}#{job_msg}"
+ safe_release(job)
+ exit
+ rescue Exception => ex
+ job_msg = job && "job: #{job.inspect} "
+ logger.error "SayWhen:: Exception: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
+ safe_release(job)
+ exit
+ end
end
- rescue Exception => ex
- logger.error "SayWhen::Scheduler stopping, error: #{ex.class.name}: #{ex.message}"
- exit
end
logger.info "SayWhen::Scheduler stopped"
end
- def process_waiting_jobs(max_jobs=1000)
- jobs_processed = 0
- while(jobs_processed < max_jobs)
- if job = process_jobs
- jobs_processed += 1
- else
- break
- end
- end
- return jobs_processed
- end
-
- def process_jobs
- job = nil
- time_now = Time.now
- self.reset_next_at ||= Time.now
-
- if reset_acquired_length > 0 && reset_next_at <= time_now
- self.reset_next_at = time_now + reset_acquired_length
- logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}"
- job_class.reset_acquired(reset_acquired_length)
- end
-
- begin
- logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
- job = job_class.acquire_next(time_now)
- rescue StandardError => ex
- job_error("Failure to acquire job", job, ex)
- job = nil
- end
-
- if job.nil?
- logger.debug "SayWhen:: no jobs to acquire, sleep"
- sleep(tick_length)
- return job
- end
-
- begin
- logger.debug "SayWhen:: got a job: #{job.inspect}"
- # delegate processing the trigger to the processor
- self.processor.process(job)
- logger.debug "SayWhen:: job processed"
-
- # if successful, update next fire at, put back to waiting / ended
- job.fired(time_now)
- logger.debug "SayWhen:: job fired complete"
- rescue StandardError => ex
- job_error("Failure to process", job, ex)
- end
-
- return job
-
- rescue StandardError => ex
- job_error("Error!", job, ex)
- sleep(tick_length)
- return job
- rescue Interrupt => ex
- job_error("Interrupt!", job, ex)
- raise ex
- rescue Exception => ex
- job_error("Exception!", job, ex)
- raise ex
- end
-
- def job_error(msg, job, ex)
- job_msg = job && " job:'#{job.inspect}'"
- logger.error "SayWhen::Scheduler #{msg}#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
+ def safe_release(job)
job.release if job
+ rescue
+ logger "Failed to release job: #{job.inspect}" rescue nil
end
def stop
logger.info "SayWhen::Scheduler stopping..."
self.running = false