lib/zhong/job.rb in zhong-0.1.4 vs lib/zhong/job.rb in zhong-0.1.5

- old
+ new

@@ -1,97 +1,92 @@ module Zhong class Job - attr_reader :name, :category + attr_reader :name, :category, :last_ran, :logger, :at, :every, :id - def initialize(name, config = {}, &block) - @name = name + def initialize(job_name, config = {}, &block) + @name = job_name @category = config[:category] + @logger = config[:logger] + @config = config - @at = At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) - @every = Every.parse(config[:every]) + @at = config[:at] ? At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) : nil + @every = config[:every] ? Every.parse(config[:every]) : nil - if @at && !@every - @logger.error "warning: #{self} has `at` but no `every`; could run far more often than expected!" - end - fail "must specific either `at` or `every` for job: #{self}" unless @at || @every @block = block @redis = config[:redis] - @logger = config[:logger] @tz = config[:tz] @if = config[:if] - @lock = Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: config[:long_running_timeout]) - @timeout = 5 - - refresh_last_ran + @long_running_timeout = config[:long_running_timeout] + @running = false + @first_run = true + @id = Digest::SHA256.hexdigest(@name) end def run?(time = Time.now) + if @first_run + clear_last_ran_if_at_changed if @at + refresh_last_ran + @first_run = false + end + run_every?(time) && run_at?(time) && run_if?(time) end def run(time = Time.now, error_handler = nil) return unless run?(time) - if running? - @logger.info "already running: #{self}" - return - end - - @thread = nil locked = false + errored = false - @lock.lock do - locked = true + begin + redis_lock.lock do + locked = true + @running = true - refresh_last_ran + refresh_last_ran - # we need to check again, as another process might have acquired - # the lock right before us and obviated our need to do anything - break unless run?(time) + # we need to check again, as another process might have acquired + # the lock right before us and obviated our need to do anything + break unless run?(time) - if disabled? - @logger.info "disabled: #{self}" - break - end + if disabled? + logger.info "not running, disabled: #{self}" + break + end - @logger.info "running: #{self}" + logger.info "running: #{self}" - if @block - @thread = Thread.new do + if @block begin @block.call rescue => boom - @logger.error "#{self} failed: #{boom}" + logger.error "#{self} failed: #{boom}" error_handler.call(boom, self) if error_handler end - - nil # do not retain thread return value end - end - ran!(time) + ran!(time) + end + rescue Suo::LockClientError => boom + logger.error "unable to run due to client error: #{boom}" + errored = true end - @logger.info "unable to acquire exclusive run lock: #{self}" unless locked - end + @running = false - def stop - return unless running? - Thread.new { @logger.error "killing #{self} due to stop" } # thread necessary due to trap context - @thread.join(@timeout) - @thread.kill + logger.info "unable to acquire exclusive run lock: #{self}" if !locked && !errored end def running? - @thread && @thread.alive? + @running end def refresh_last_ran - last_ran_val = @redis.get(run_time_key) + last_ran_val = @redis.get(last_ran_key) @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil end def disable @redis.set(disabled_key, "true") @@ -104,21 +99,58 @@ def disabled? !!@redis.get(disabled_key) end def to_s - [@category, @name].compact.join(".").freeze + @to_s ||= [@category, @name].compact.join(".").freeze end def next_at every_time = @every.next_at(@last_ran) if @last_ran && @every at_time = @at.next_at(Time.now) if @at [every_time, at_time, Time.now].compact.max || "now" end + def clear + @redis.del(last_ran_key) + end + + def last_ran_key + "zhong:last_ran:#{self}" + end + + def desired_at_key + "zhong:at:#{self}" + end + + def disabled_key + "zhong:disabled:#{self}" + end + + def lock_key + "zhong:lock:#{self}" + end + private + # if the @at value is changed across runs, the last_run becomes invalid + # so clear it + def clear_last_ran_if_at_changed + previous_at_msgpack = @redis.get(desired_at_key) + + if previous_at_msgpack + previous_at = At.deserialize(previous_at_msgpack) + + if previous_at != @at + logger.error "#{self} period changed (from #{previous_at} to #{@at}), clearing last run" + clear + end + end + + @redis.set(desired_at_key, @at.serialize) + end + def run_every?(time) !@last_ran || !@every || @every.next_at(@last_ran) <= time end def run_at?(time) @@ -129,21 +161,13 @@ !@if || @if.call(time) end def ran!(time) @last_ran = time - @redis.set(run_time_key, @last_ran.to_i) + @redis.set(last_ran_key, @last_ran.to_i) end - def run_time_key - "zhong:last_ran:#{self}" - end - - def disabled_key - "zhong:disabled:#{self}" - end - - def lock_key - "zhong:lock:#{self}" + def redis_lock + @lock ||= Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: @long_running_timeout) end end end