lib/zhong/job.rb in zhong-0.1.9 vs lib/zhong/job.rb in zhong-0.2.0
- old
+ new
@@ -1,24 +1,26 @@
module Zhong
class Job
- attr_reader :name, :category, :last_ran, :logger, :at, :every, :id
+ extend Forwardable
+ def_delegators Zhong, :redis, :tz, :logger, :heartbeat_key
- def initialize(job_name, config = {}, &block)
+ attr_reader :name, :category, :last_ran, :at, :every, :id
+
+ def initialize(job_name, config = {}, callbacks = {}, &block)
@name = job_name
@category = config[:category]
@logger = config[:logger]
@config = config
+ @callbacks = callbacks
@at = config[:at] ? At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) : nil
@every = config[:every] ? Every.parse(config[:every]) : nil
raise "must specific either `at` or `every` for job: #{self}" unless @at || @every
@block = block
- @redis = config[:redis]
- @tz = config[:tz]
@if = config[:if]
@long_running_timeout = config[:long_running_timeout]
@running = false
@first_run = true
@id = Digest::SHA256.hexdigest(@name)
@@ -86,24 +88,28 @@
def running?
@running
end
def refresh_last_ran
- last_ran_val = @redis.get(last_ran_key)
+ last_ran_val = redis.get(last_ran_key)
@last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil
end
def disable
- @redis.set(disabled_key, "true")
+ fire_callbacks(:before_disable, self)
+ redis.set(disabled_key, "true")
+ fire_callbacks(:after_disable, self)
end
def enable
- @redis.del(disabled_key)
+ fire_callbacks(:before_enable, self)
+ redis.del(disabled_key)
+ fire_callbacks(:after_enable, self)
end
def disabled?
- !@redis.get(disabled_key).nil?
+ !redis.get(disabled_key).nil?
end
def to_s
@to_s ||= [@category, @name].compact.join(".").freeze
end
@@ -113,11 +119,11 @@
at_time = @at.next_at(Time.now) if @at
[every_time, at_time, Time.now].compact.max || "now"
end
def clear
- @redis.del(last_ran_key)
+ redis.del(last_ran_key)
end
def last_ran_key
"zhong:last_ran:#{self}"
end
@@ -134,25 +140,31 @@
"zhong:lock:#{self}"
end
private
+ def fire_callbacks(event, *args)
+ @callbacks[event].to_a.map do |callback|
+ callback.call(*args)
+ end.compact.all? # do not skip on nils
+ end
+
# if the @at value is changed across runs, the last_run becomes invalid
# so clear it
def clear_last_ran_if_at_changed
- previous_at_msgpack = @redis.get(desired_at_key)
+ previous_at_msgpack = redis.get(desired_at_key)
if previous_at_msgpack
previous_at = At.deserialize(previous_at_msgpack)
if previous_at != @at
logger.error "#{self} period changed (from #{previous_at} to #{@at}), clearing last run"
clear
end
end
- @redis.set(desired_at_key, @at.serialize)
+ redis.set(desired_at_key, @at.serialize)
end
def run_every?(time)
!@last_ran || !@every || @every.next_at(@last_ran) <= time
end
@@ -165,10 +177,10 @@
!@if || @if.call(time)
end
def ran!(time)
@last_ran = time
- @redis.set(last_ran_key, @last_ran.to_i)
+ redis.set(last_ran_key, @last_ran.to_i)
end
def redis_lock
@lock ||= Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: @long_running_timeout)
end