lib/zhong/scheduler.rb in zhong-0.1.4 vs lib/zhong/scheduler.rb in zhong-0.1.5

- old
+ new

@@ -3,21 +3,24 @@ attr_reader :config, :redis, :jobs DEFAULT_CONFIG = { timeout: 0.5, grace: 15.minutes, - long_running_timeout: 5.minutes + long_running_timeout: 5.minutes, + tz: nil }.freeze - TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze - def initialize(config = {}) @jobs = {} @callbacks = {} @config = DEFAULT_CONFIG.merge(config) - @logger = @config[:logger] ||= Util.default_logger - @redis = @config[:redis] ||= Redis.new(ENV["REDIS_URL"]) + + @logger = @config[:logger] + @redis = @config[:redis] + @tz = @config[:tz] + @category = nil + @error_handler = nil end def category(name) fail "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category @@ -28,12 +31,19 @@ @category = nil end def every(period, name, opts = {}, &block) fail "must specify a period for #{name} (#{caller.first})" unless period + job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block) - add(job) + + if jobs.key?(job.id) + @logger.error "duplicate job #{job}, skipping" + return + end + + @jobs[job.id] = job end def error_handler(&block) @error_handler = block if block_given? @error_handler @@ -43,66 +53,88 @@ fail "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym) (@callbacks[event.to_sym] ||= []) << block end def start - TRAPPED_SIGNALS.each do |sig| - Signal.trap(sig) { stop } - end - @logger.info "starting at #{redis_time}" + @stop = false + + trap_signals + loop do if fire_callbacks(:before_tick) now = redis_time - jobs.each do |_, job| - if fire_callbacks(:before_run, job, now) - job.run(now, error_handler) - fire_callbacks(:after_run, job, now) - end + jobs_to_run(now).each do |_, job| + break if @stop + run_job(job, now) end + break if @stop + fire_callbacks(:after_tick) - GC.start + heartbeat(now) - sleep(interval) + break if @stop + sleep_until_next_second end break if @stop end + + Thread.new { @logger.info "stopped" }.join end def stop Thread.new { @logger.error "stopping" } # thread necessary due to trap context @stop = true - jobs.values.each(&:stop) - Thread.new { @logger.info "stopped" } end + private + + TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze + private_constant :TRAPPED_SIGNALS + def fire_callbacks(event, *args) @callbacks[event].to_a.all? { |h| h.call(*args) } end - private + def jobs_to_run(time = redis_time) + jobs.select { |_, job| job.run?(time) } + end - def add(job) - if @jobs.key?(job.to_s) - @logger.error "duplicate job #{job}, skipping" - return - end + def run_job(job, time = redis_time) + return unless fire_callbacks(:before_run, job, time) - @jobs[job.to_s] = job + job.run(time, error_handler) + + fire_callbacks(:after_run, job, time) end - def interval - 1.0 - Time.now.subsec + 0.001 + def heartbeat(time) + @redis.setex(heartbeat_key, @config[:grace].to_i, time.to_i) end + def heartbeat_key + @heartbeat_key ||= "zhong:heartbeat:#{`hostname`.strip}##{Process.pid}" + end + + def trap_signals + TRAPPED_SIGNALS.each do |sig| + Signal.trap(sig) { stop } + end + end + + def sleep_until_next_second + GC.start + sleep(1.0 - Time.now.subsec + 0.0001) + end + def redis_time s, ms = @redis.time # returns [seconds since epoch, microseconds] now = Time.at(s + ms / (10**6)) - config[:tz] ? now.in_time_zone(config[:tz]) : now + @tz ? now.in_time_zone(@tz) : now end end end