lib/say_when/scheduler.rb in say_when-1.0.0 vs lib/say_when/scheduler.rb in say_when-2.0.0
- old
+ new
@@ -1,133 +1,99 @@
# encoding: utf-8
+require 'say_when/utils'
+
module SayWhen
class Scheduler
+ include SayWhen::Utils
- DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple
- DEFAULT_STORAGE_STRATEGY = :memory
+ # When passing in a job, can be a Hash, String, or Class
+ # Hash: { class: '<class name>' } or { job_class: '<class name>' }
+ # String: '<class name>'
+ # Class: <job class>
+ def schedule(job)
+ storage.create(job)
+ end
- @@scheduler = nil
- @@lock = Mutex.new
+ def schedule_cron(expression, job)
+ time_zone = if job.is_a?(Hash)
+ job.delete(:time_zone)
+ end || 'UTC'
+ options = job_options(job)
+ options[:trigger_strategy] = :cron
+ options[:trigger_options] = { expression: expression, time_zone: time_zone }
+ schedule(options)
+ end
- attr_accessor :storage_strategy, :processor_class, :tick_length
+ def schedule_instance(next_at_method = 'next_fire_at', job = {})
+ options = job_options(job)
+ options[:trigger_strategy] = 'instance'
+ options[:trigger_options] = { next_at_method: next_at_method }
+ schedule(options)
+ end
- attr_accessor :running
+ def schedule_once(time, job = {})
+ options = job_options(job)
+ options[:trigger_strategy] = 'once'
+ options[:trigger_options] = { at: time}
+ schedule(options)
+ end
- # support for a singleton scheduler, but you are not restricted to this
- class << self
- def scheduler
- return @@scheduler if @@scheduler
- @@lock.synchronize { @@scheduler = self.new if @@scheduler.nil? }
- @@scheduler
- end
+ def schedule_in(after, job = {})
+ options = job_options(job)
+ options[:trigger_strategy] = 'once'
+ options[:trigger_options] = { at: (Time.now + after)}
+ schedule(options)
+ end
- def configure
- yield scheduler
- scheduler
- end
-
- def schedule(job)
- scheduler.schedule(job)
- end
-
- def start
- scheduler.start
- end
+ def job_options(job)
+ {
+ scheduled: extract_scheduled(job),
+ job_class: extract_job_class(job),
+ job_method: extract_job_method(job),
+ data: extract_data(job)
+ }
end
- def initialize
- self.tick_length = [ENV['SAY_WHEN_TICK_LENGTH'].to_i, 5].max
+ def extract_scheduled(job)
+ job[:scheduled] if job.is_a?(Hash)
end
- def processor
- if @processor.nil?
- @processor_class ||= DEFAULT_PROCESSOR_CLASS
- @processor = @processor_class.new(self)
+ def extract_job_class(job)
+ job_class = if job.is_a?(Hash)
+ job[:class] || job[:job_class]
+ elsif job.is_a?(Class)
+ job.name
+ elsif job.is_a?(String)
+ job
end
- @processor
- end
- def start
- logger.info "SayWhen::Scheduler starting"
-
- [$stdout, $stderr].each{|s| s.sync = true; s.flush}
-
- trap("TERM", "EXIT")
-
- begin
-
- self.running = true
-
- logger.info "SayWhen::Scheduler running"
- job = nil
- while running
- begin
- time_now = Time.now
- logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
- job = job_class.acquire_next(time_now)
- if job.nil?
- logger.debug "SayWhen:: no jobs to acquire, sleep"
- sleep(tick_length)
- else
- logger.debug "SayWhen:: got a job: #{job.inspect}"
- # delegate processing the trigger to the processor
- self.processor.process(job)
- logger.debug "SayWhen:: job processed"
-
- # this should update next fire at, and put back in list of scheduled jobs
- job.fired(time_now)
- logger.debug "SayWhen:: job fired complete"
- end
- rescue StandardError => ex
- job_msg = job && "job: #{job.inspect} "
- logger.error "SayWhen:: Failure: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
- safe_release(job)
- sleep(tick_length)
- rescue Interrupt => ex
- job_msg = job && "\n - interrupted job: #{job.inspect}\n"
- logger.error "\nSayWhen:: Interrupt! #{ex.inspect}#{job_msg}"
- safe_release(job)
- exit
- rescue Exception => ex
- job_msg = job && "job: #{job.inspect} "
- logger.error "SayWhen:: Exception: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
- safe_release(job)
- exit
- end
- end
+ if !job_class
+ raise "Could not identify job class from: #{job}"
end
- logger.info "SayWhen::Scheduler stopped"
+ job_class
end
- def safe_release(job)
- job.release if job
- rescue
- logger "Failed to release job: #{job.inspect}" rescue nil
+ def extract_job_method(job)
+ if job.is_a?(Hash)
+ job[:method] || job[:job_method]
+ end || 'execute'
end
- def stop
- logger.info "SayWhen::Scheduler stopping..."
- self.running = false
+ def extract_data(job)
+ job[:data] if job && job.is_a?(Hash)
end
- def job_class
- @job_class ||= load_job_class
+ def storage=(s)
+ @storage = s
end
- def load_job_class
- strategy = @storage_strategy || DEFAULT_STORAGE_STRATEGY
- require "say_when/storage/#{strategy}/job"
- job_class_name = "SayWhen::Storage::#{strategy.to_s.camelize}::Job"
- job_class_name.constantize
+ def storage
+ @storage ||= load_strategy(:storage, SayWhen.options[:storage_strategy])
end
- def schedule(job)
- job_class.create(job)
- end
-
def logger
- SayWhen::logger
+ SayWhen.logger
end
end
end