lib/zhong/scheduler.rb in zhong-0.1.4 vs lib/zhong/scheduler.rb in zhong-0.1.5
- old
+ new
@@ -3,21 +3,24 @@
attr_reader :config, :redis, :jobs
DEFAULT_CONFIG = {
timeout: 0.5,
grace: 15.minutes,
- long_running_timeout: 5.minutes
+ long_running_timeout: 5.minutes,
+ tz: nil
}.freeze
- TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze
-
def initialize(config = {})
@jobs = {}
@callbacks = {}
@config = DEFAULT_CONFIG.merge(config)
- @logger = @config[:logger] ||= Util.default_logger
- @redis = @config[:redis] ||= Redis.new(ENV["REDIS_URL"])
+
+ @logger = @config[:logger]
+ @redis = @config[:redis]
+ @tz = @config[:tz]
+ @category = nil
+ @error_handler = nil
end
def category(name)
fail "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category
@@ -28,12 +31,19 @@
@category = nil
end
def every(period, name, opts = {}, &block)
fail "must specify a period for #{name} (#{caller.first})" unless period
+
job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block)
- add(job)
+
+ if jobs.key?(job.id)
+ @logger.error "duplicate job #{job}, skipping"
+ return
+ end
+
+ @jobs[job.id] = job
end
def error_handler(&block)
@error_handler = block if block_given?
@error_handler
@@ -43,66 +53,88 @@
fail "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym)
(@callbacks[event.to_sym] ||= []) << block
end
def start
- TRAPPED_SIGNALS.each do |sig|
- Signal.trap(sig) { stop }
- end
-
@logger.info "starting at #{redis_time}"
+ @stop = false
+
+ trap_signals
+
loop do
if fire_callbacks(:before_tick)
now = redis_time
- jobs.each do |_, job|
- if fire_callbacks(:before_run, job, now)
- job.run(now, error_handler)
- fire_callbacks(:after_run, job, now)
- end
+ jobs_to_run(now).each do |_, job|
+ break if @stop
+ run_job(job, now)
end
+ break if @stop
+
fire_callbacks(:after_tick)
- GC.start
+ heartbeat(now)
- sleep(interval)
+ break if @stop
+ sleep_until_next_second
end
break if @stop
end
+
+ Thread.new { @logger.info "stopped" }.join
end
def stop
Thread.new { @logger.error "stopping" } # thread necessary due to trap context
@stop = true
- jobs.values.each(&:stop)
- Thread.new { @logger.info "stopped" }
end
+ private
+
+ TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze
+ private_constant :TRAPPED_SIGNALS
+
def fire_callbacks(event, *args)
@callbacks[event].to_a.all? { |h| h.call(*args) }
end
- private
+ def jobs_to_run(time = redis_time)
+ jobs.select { |_, job| job.run?(time) }
+ end
- def add(job)
- if @jobs.key?(job.to_s)
- @logger.error "duplicate job #{job}, skipping"
- return
- end
+ def run_job(job, time = redis_time)
+ return unless fire_callbacks(:before_run, job, time)
- @jobs[job.to_s] = job
+ job.run(time, error_handler)
+
+ fire_callbacks(:after_run, job, time)
end
- def interval
- 1.0 - Time.now.subsec + 0.001
+ def heartbeat(time)
+ @redis.setex(heartbeat_key, @config[:grace].to_i, time.to_i)
end
+ def heartbeat_key
+ @heartbeat_key ||= "zhong:heartbeat:#{`hostname`.strip}##{Process.pid}"
+ end
+
+ def trap_signals
+ TRAPPED_SIGNALS.each do |sig|
+ Signal.trap(sig) { stop }
+ end
+ end
+
+ def sleep_until_next_second
+ GC.start
+ sleep(1.0 - Time.now.subsec + 0.0001)
+ end
+
def redis_time
s, ms = @redis.time # returns [seconds since epoch, microseconds]
now = Time.at(s + ms / (10**6))
- config[:tz] ? now.in_time_zone(config[:tz]) : now
+ @tz ? now.in_time_zone(@tz) : now
end
end
end