lib/zhong/scheduler.rb in zhong-0.1.5 vs lib/zhong/scheduler.rb in zhong-0.1.6

- old
+ new

@@ -17,24 +17,33 @@ @logger = @config[:logger] @redis = @config[:redis] @tz = @config[:tz] @category = nil @error_handler = nil + @running = false end + def clear + raise "unable to clear while running; run Zhong.stop first" if @running + + @jobs = {} + @callbacks = {} + @category = nil + end + def category(name) - fail "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category + raise "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category @category = name.to_s yield(self) @category = nil end def every(period, name, opts = {}, &block) - fail "must specify a period for #{name} (#{caller.first})" unless period + raise "must specify a period for #{name} (#{caller.first})" unless period job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block) if jobs.key?(job.id) @logger.error "duplicate job #{job}, skipping" @@ -48,22 +57,26 @@ @error_handler = block if block_given? @error_handler end def on(event, &block) - fail "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym) + raise "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym) (@callbacks[event.to_sym] ||= []) << block end def start @logger.info "starting at #{redis_time}" @stop = false trap_signals + raise "already running" if @running + loop do + @running = true + if fire_callbacks(:before_tick) now = redis_time jobs_to_run(now).each do |_, job| break if @stop @@ -81,18 +94,30 @@ end break if @stop end + @running = false + Thread.new { @logger.info "stopped" }.join end def stop - Thread.new { @logger.error "stopping" } # thread necessary due to trap context + Thread.new { @logger.error "stopping" } if @running # thread necessary due to trap context @stop = true end + def find_by_name(job_name) + @jobs[Digest::SHA256.hexdigest(job_name)] + end + + def redis_time + s, ms = @redis.time # returns [seconds since epoch, microseconds] + now = Time.at(s + ms / (10**6)) + @tz ? now.in_time_zone(@tz) : now + end + private TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze private_constant :TRAPPED_SIGNALS @@ -105,13 +130,13 @@ end def run_job(job, time = redis_time) return unless fire_callbacks(:before_run, job, time) - job.run(time, error_handler) + ran = job.run(time, error_handler) - fire_callbacks(:after_run, job, time) + fire_callbacks(:after_run, job, time, ran) end def heartbeat(time) @redis.setex(heartbeat_key, @config[:grace].to_i, time.to_i) end @@ -127,14 +152,8 @@ end def sleep_until_next_second GC.start sleep(1.0 - Time.now.subsec + 0.0001) - end - - def redis_time - s, ms = @redis.time # returns [seconds since epoch, microseconds] - now = Time.at(s + ms / (10**6)) - @tz ? now.in_time_zone(@tz) : now end end end