lib/zhong/job.rb in zhong-0.1.9 vs lib/zhong/job.rb in zhong-0.2.0

- old
+ new

@@ -1,24 +1,26 @@ module Zhong class Job - attr_reader :name, :category, :last_ran, :logger, :at, :every, :id + extend Forwardable + def_delegators Zhong, :redis, :tz, :logger, :heartbeat_key - def initialize(job_name, config = {}, &block) + attr_reader :name, :category, :last_ran, :at, :every, :id + + def initialize(job_name, config = {}, callbacks = {}, &block) @name = job_name @category = config[:category] @logger = config[:logger] @config = config + @callbacks = callbacks @at = config[:at] ? At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) : nil @every = config[:every] ? Every.parse(config[:every]) : nil raise "must specific either `at` or `every` for job: #{self}" unless @at || @every @block = block - @redis = config[:redis] - @tz = config[:tz] @if = config[:if] @long_running_timeout = config[:long_running_timeout] @running = false @first_run = true @id = Digest::SHA256.hexdigest(@name) @@ -86,24 +88,28 @@ def running? @running end def refresh_last_ran - last_ran_val = @redis.get(last_ran_key) + last_ran_val = redis.get(last_ran_key) @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil end def disable - @redis.set(disabled_key, "true") + fire_callbacks(:before_disable, self) + redis.set(disabled_key, "true") + fire_callbacks(:after_disable, self) end def enable - @redis.del(disabled_key) + fire_callbacks(:before_enable, self) + redis.del(disabled_key) + fire_callbacks(:after_enable, self) end def disabled? - !@redis.get(disabled_key).nil? + !redis.get(disabled_key).nil? end def to_s @to_s ||= [@category, @name].compact.join(".").freeze end @@ -113,11 +119,11 @@ at_time = @at.next_at(Time.now) if @at [every_time, at_time, Time.now].compact.max || "now" end def clear - @redis.del(last_ran_key) + redis.del(last_ran_key) end def last_ran_key "zhong:last_ran:#{self}" end @@ -134,25 +140,31 @@ "zhong:lock:#{self}" end private + def fire_callbacks(event, *args) + @callbacks[event].to_a.map do |callback| + callback.call(*args) + end.compact.all? # do not skip on nils + end + # if the @at value is changed across runs, the last_run becomes invalid # so clear it def clear_last_ran_if_at_changed - previous_at_msgpack = @redis.get(desired_at_key) + previous_at_msgpack = redis.get(desired_at_key) if previous_at_msgpack previous_at = At.deserialize(previous_at_msgpack) if previous_at != @at logger.error "#{self} period changed (from #{previous_at} to #{@at}), clearing last run" clear end end - @redis.set(desired_at_key, @at.serialize) + redis.set(desired_at_key, @at.serialize) end def run_every?(time) !@last_ran || !@every || @every.next_at(@last_ran) <= time end @@ -165,10 +177,10 @@ !@if || @if.call(time) end def ran!(time) @last_ran = time - @redis.set(last_ran_key, @last_ran.to_i) + redis.set(last_ran_key, @last_ran.to_i) end def redis_lock @lock ||= Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: @long_running_timeout) end