lib/say_when/scheduler.rb in say_when-1.0.0 vs lib/say_when/scheduler.rb in say_when-2.0.0

- old
+ new

@@ -1,133 +1,99 @@ # encoding: utf-8 +require 'say_when/utils' + module SayWhen class Scheduler + include SayWhen::Utils - DEFAULT_PROCESSOR_CLASS = SayWhen::Processor::Simple - DEFAULT_STORAGE_STRATEGY = :memory + # When passing in a job, can be a Hash, String, or Class + # Hash: { class: '<class name>' } or { job_class: '<class name>' } + # String: '<class name>' + # Class: <job class> + def schedule(job) + storage.create(job) + end - @@scheduler = nil - @@lock = Mutex.new + def schedule_cron(expression, job) + time_zone = if job.is_a?(Hash) + job.delete(:time_zone) + end || 'UTC' + options = job_options(job) + options[:trigger_strategy] = :cron + options[:trigger_options] = { expression: expression, time_zone: time_zone } + schedule(options) + end - attr_accessor :storage_strategy, :processor_class, :tick_length + def schedule_instance(next_at_method = 'next_fire_at', job = {}) + options = job_options(job) + options[:trigger_strategy] = 'instance' + options[:trigger_options] = { next_at_method: next_at_method } + schedule(options) + end - attr_accessor :running + def schedule_once(time, job = {}) + options = job_options(job) + options[:trigger_strategy] = 'once' + options[:trigger_options] = { at: time} + schedule(options) + end - # support for a singleton scheduler, but you are not restricted to this - class << self - def scheduler - return @@scheduler if @@scheduler - @@lock.synchronize { @@scheduler = self.new if @@scheduler.nil? } - @@scheduler - end + def schedule_in(after, job = {}) + options = job_options(job) + options[:trigger_strategy] = 'once' + options[:trigger_options] = { at: (Time.now + after)} + schedule(options) + end - def configure - yield scheduler - scheduler - end - - def schedule(job) - scheduler.schedule(job) - end - - def start - scheduler.start - end + def job_options(job) + { + scheduled: extract_scheduled(job), + job_class: extract_job_class(job), + job_method: extract_job_method(job), + data: extract_data(job) + } end - def initialize - self.tick_length = [ENV['SAY_WHEN_TICK_LENGTH'].to_i, 5].max + def extract_scheduled(job) + job[:scheduled] if job.is_a?(Hash) end - def processor - if @processor.nil? - @processor_class ||= DEFAULT_PROCESSOR_CLASS - @processor = @processor_class.new(self) + def extract_job_class(job) + job_class = if job.is_a?(Hash) + job[:class] || job[:job_class] + elsif job.is_a?(Class) + job.name + elsif job.is_a?(String) + job end - @processor - end - def start - logger.info "SayWhen::Scheduler starting" - - [$stdout, $stderr].each{|s| s.sync = true; s.flush} - - trap("TERM", "EXIT") - - begin - - self.running = true - - logger.info "SayWhen::Scheduler running" - job = nil - while running - begin - time_now = Time.now - logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" - job = job_class.acquire_next(time_now) - if job.nil? - logger.debug "SayWhen:: no jobs to acquire, sleep" - sleep(tick_length) - else - logger.debug "SayWhen:: got a job: #{job.inspect}" - # delegate processing the trigger to the processor - self.processor.process(job) - logger.debug "SayWhen:: job processed" - - # this should update next fire at, and put back in list of scheduled jobs - job.fired(time_now) - logger.debug "SayWhen:: job fired complete" - end - rescue StandardError => ex - job_msg = job && "job: #{job.inspect} " - logger.error "SayWhen:: Failure: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" - safe_release(job) - sleep(tick_length) - rescue Interrupt => ex - job_msg = job && "\n - interrupted job: #{job.inspect}\n" - logger.error "\nSayWhen:: Interrupt! #{ex.inspect}#{job_msg}" - safe_release(job) - exit - rescue Exception => ex - job_msg = job && "job: #{job.inspect} " - logger.error "SayWhen:: Exception: #{job_msg}exception: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}" - safe_release(job) - exit - end - end + if !job_class + raise "Could not identify job class from: #{job}" end - logger.info "SayWhen::Scheduler stopped" + job_class end - def safe_release(job) - job.release if job - rescue - logger "Failed to release job: #{job.inspect}" rescue nil + def extract_job_method(job) + if job.is_a?(Hash) + job[:method] || job[:job_method] + end || 'execute' end - def stop - logger.info "SayWhen::Scheduler stopping..." - self.running = false + def extract_data(job) + job[:data] if job && job.is_a?(Hash) end - def job_class - @job_class ||= load_job_class + def storage=(s) + @storage = s end - def load_job_class - strategy = @storage_strategy || DEFAULT_STORAGE_STRATEGY - require "say_when/storage/#{strategy}/job" - job_class_name = "SayWhen::Storage::#{strategy.to_s.camelize}::Job" - job_class_name.constantize + def storage + @storage ||= load_strategy(:storage, SayWhen.options[:storage_strategy]) end - def schedule(job) - job_class.create(job) - end - def logger - SayWhen::logger + SayWhen.logger end end end