lib/zhong/scheduler.rb in zhong-0.1.0 vs lib/zhong/scheduler.rb in zhong-0.1.1

- old
+ new

@@ -1,42 +1,73 @@ module Zhong class Scheduler attr_reader :config, :redis, :jobs + DEFAULT_CONFIG = { + timeout: 0.5, + grace: 15.minutes, + long_running_timeout: 5.minutes + }.freeze + + TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze + def initialize(config = {}) @jobs = {} - @config = {timeout: 0.5, grace: 15.minutes, long_running_timeout: 5.minutes}.merge(config) - @logger = @config[:logger] ||= self.class.default_logger - @redis = @config[:redis] ||= Redis.new + @callbacks = {} + @config = DEFAULT_CONFIG.merge(config) + @logger = @config[:logger] ||= Util.default_logger + @redis = @config[:redis] ||= Redis.new(ENV["REDIS_URL"]) end def category(name) - @category = name + fail "cannot nest categories: #{name} would be nested in #{@category}" if @category - yield + @category = name.to_s + yield(self) + @category = nil end def every(period, name, opts = {}, &block) - add(Job.new(scheduler: self, name: name, every: period, at: opts[:at], only_if: opts[:if], category: @category, &block)) + job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block) + add(job) end + def error_handler(&block) + @error_handler = block if block_given? + @error_handler + end + + def on(event, &block) + fail "Unsupported callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym) + (@callbacks[event.to_sym] ||= []) << block + end + def start - %w(QUIT INT TERM).each do |sig| + TRAPPED_SIGNALS.each do |sig| Signal.trap(sig) { stop } end @logger.info "starting at #{redis_time}" loop do - now = redis_time + if fire_callbacks(:before_tick) + now = redis_time - jobs.each { |_, job| job.run(now) } + jobs.each do |_, job| + if fire_callbacks(:before_run, job, now) + job.run(now) + fire_callbacks(:after_run, job, now) + end + end - sleep(interval) + fire_callbacks(:after_tick) + sleep(interval) + end + break if @stop end end def stop @@ -44,10 +75,14 @@ @stop = true jobs.values.each(&:stop) Thread.new { @logger.info "stopped" } end + def fire_callbacks(event, *args) + @callbacks[event].to_a.all? { |h| h.call(*args) } + end + private def add(job) if @jobs.key?(job.to_s) @logger.error "duplicate job #{job}, skipping" @@ -63,14 +98,8 @@ def redis_time s, ms = @redis.time # returns [seconds since epoch, microseconds] now = Time.at(s + ms / (10**6)) config[:tz] ? now.in_time_zone(config[:tz]) : now - end - - def self.default_logger - Logger.new(STDOUT).tap do |logger| - logger.formatter = -> (_, datetime, _, msg) { "#{datetime}: #{msg}\n" } - end end end end