lib/zhong/scheduler.rb in zhong-0.1.0 vs lib/zhong/scheduler.rb in zhong-0.1.1
- old
+ new
@@ -1,42 +1,73 @@
module Zhong
class Scheduler
attr_reader :config, :redis, :jobs
+ DEFAULT_CONFIG = {
+ timeout: 0.5,
+ grace: 15.minutes,
+ long_running_timeout: 5.minutes
+ }.freeze
+
+ TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze
+
def initialize(config = {})
@jobs = {}
- @config = {timeout: 0.5, grace: 15.minutes, long_running_timeout: 5.minutes}.merge(config)
- @logger = @config[:logger] ||= self.class.default_logger
- @redis = @config[:redis] ||= Redis.new
+ @callbacks = {}
+ @config = DEFAULT_CONFIG.merge(config)
+ @logger = @config[:logger] ||= Util.default_logger
+ @redis = @config[:redis] ||= Redis.new(ENV["REDIS_URL"])
end
def category(name)
- @category = name
+ fail "cannot nest categories: #{name} would be nested in #{@category}" if @category
- yield
+ @category = name.to_s
+ yield(self)
+
@category = nil
end
def every(period, name, opts = {}, &block)
- add(Job.new(scheduler: self, name: name, every: period, at: opts[:at], only_if: opts[:if], category: @category, &block))
+ job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block)
+ add(job)
end
+ def error_handler(&block)
+ @error_handler = block if block_given?
+ @error_handler
+ end
+
+ def on(event, &block)
+ fail "Unsupported callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym)
+ (@callbacks[event.to_sym] ||= []) << block
+ end
+
def start
- %w(QUIT INT TERM).each do |sig|
+ TRAPPED_SIGNALS.each do |sig|
Signal.trap(sig) { stop }
end
@logger.info "starting at #{redis_time}"
loop do
- now = redis_time
+ if fire_callbacks(:before_tick)
+ now = redis_time
- jobs.each { |_, job| job.run(now) }
+ jobs.each do |_, job|
+ if fire_callbacks(:before_run, job, now)
+ job.run(now)
+ fire_callbacks(:after_run, job, now)
+ end
+ end
- sleep(interval)
+ fire_callbacks(:after_tick)
+ sleep(interval)
+ end
+
break if @stop
end
end
def stop
@@ -44,10 +75,14 @@
@stop = true
jobs.values.each(&:stop)
Thread.new { @logger.info "stopped" }
end
+ def fire_callbacks(event, *args)
+ @callbacks[event].to_a.all? { |h| h.call(*args) }
+ end
+
private
def add(job)
if @jobs.key?(job.to_s)
@logger.error "duplicate job #{job}, skipping"
@@ -63,14 +98,8 @@
def redis_time
s, ms = @redis.time # returns [seconds since epoch, microseconds]
now = Time.at(s + ms / (10**6))
config[:tz] ? now.in_time_zone(config[:tz]) : now
- end
-
- def self.default_logger
- Logger.new(STDOUT).tap do |logger|
- logger.formatter = -> (_, datetime, _, msg) { "#{datetime}: #{msg}\n" }
- end
end
end
end