lib/zhong/job.rb in zhong-0.1.4 vs lib/zhong/job.rb in zhong-0.1.5
- old
+ new
@@ -1,97 +1,92 @@
module Zhong
class Job
- attr_reader :name, :category
+ attr_reader :name, :category, :last_ran, :logger, :at, :every, :id
- def initialize(name, config = {}, &block)
- @name = name
+ def initialize(job_name, config = {}, &block)
+ @name = job_name
@category = config[:category]
+ @logger = config[:logger]
+ @config = config
- @at = At.parse(config[:at], grace: config.fetch(:grace, 15.minutes))
- @every = Every.parse(config[:every])
+ @at = config[:at] ? At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) : nil
+ @every = config[:every] ? Every.parse(config[:every]) : nil
- if @at && !@every
- @logger.error "warning: #{self} has `at` but no `every`; could run far more often than expected!"
- end
-
fail "must specific either `at` or `every` for job: #{self}" unless @at || @every
@block = block
@redis = config[:redis]
- @logger = config[:logger]
@tz = config[:tz]
@if = config[:if]
- @lock = Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: config[:long_running_timeout])
- @timeout = 5
-
- refresh_last_ran
+ @long_running_timeout = config[:long_running_timeout]
+ @running = false
+ @first_run = true
+ @id = Digest::SHA256.hexdigest(@name)
end
def run?(time = Time.now)
+ if @first_run
+ clear_last_ran_if_at_changed if @at
+ refresh_last_ran
+ @first_run = false
+ end
+
run_every?(time) && run_at?(time) && run_if?(time)
end
def run(time = Time.now, error_handler = nil)
return unless run?(time)
- if running?
- @logger.info "already running: #{self}"
- return
- end
-
- @thread = nil
locked = false
+ errored = false
- @lock.lock do
- locked = true
+ begin
+ redis_lock.lock do
+ locked = true
+ @running = true
- refresh_last_ran
+ refresh_last_ran
- # we need to check again, as another process might have acquired
- # the lock right before us and obviated our need to do anything
- break unless run?(time)
+ # we need to check again, as another process might have acquired
+ # the lock right before us and obviated our need to do anything
+ break unless run?(time)
- if disabled?
- @logger.info "disabled: #{self}"
- break
- end
+ if disabled?
+ logger.info "not running, disabled: #{self}"
+ break
+ end
- @logger.info "running: #{self}"
+ logger.info "running: #{self}"
- if @block
- @thread = Thread.new do
+ if @block
begin
@block.call
rescue => boom
- @logger.error "#{self} failed: #{boom}"
+ logger.error "#{self} failed: #{boom}"
error_handler.call(boom, self) if error_handler
end
-
- nil # do not retain thread return value
end
- end
- ran!(time)
+ ran!(time)
+ end
+ rescue Suo::LockClientError => boom
+ logger.error "unable to run due to client error: #{boom}"
+ errored = true
end
- @logger.info "unable to acquire exclusive run lock: #{self}" unless locked
- end
+ @running = false
- def stop
- return unless running?
- Thread.new { @logger.error "killing #{self} due to stop" } # thread necessary due to trap context
- @thread.join(@timeout)
- @thread.kill
+ logger.info "unable to acquire exclusive run lock: #{self}" if !locked && !errored
end
def running?
- @thread && @thread.alive?
+ @running
end
def refresh_last_ran
- last_ran_val = @redis.get(run_time_key)
+ last_ran_val = @redis.get(last_ran_key)
@last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil
end
def disable
@redis.set(disabled_key, "true")
@@ -104,21 +99,58 @@
def disabled?
!!@redis.get(disabled_key)
end
def to_s
- [@category, @name].compact.join(".").freeze
+ @to_s ||= [@category, @name].compact.join(".").freeze
end
def next_at
every_time = @every.next_at(@last_ran) if @last_ran && @every
at_time = @at.next_at(Time.now) if @at
[every_time, at_time, Time.now].compact.max || "now"
end
+ def clear
+ @redis.del(last_ran_key)
+ end
+
+ def last_ran_key
+ "zhong:last_ran:#{self}"
+ end
+
+ def desired_at_key
+ "zhong:at:#{self}"
+ end
+
+ def disabled_key
+ "zhong:disabled:#{self}"
+ end
+
+ def lock_key
+ "zhong:lock:#{self}"
+ end
+
private
+ # if the @at value is changed across runs, the last_run becomes invalid
+ # so clear it
+ def clear_last_ran_if_at_changed
+ previous_at_msgpack = @redis.get(desired_at_key)
+
+ if previous_at_msgpack
+ previous_at = At.deserialize(previous_at_msgpack)
+
+ if previous_at != @at
+ logger.error "#{self} period changed (from #{previous_at} to #{@at}), clearing last run"
+ clear
+ end
+ end
+
+ @redis.set(desired_at_key, @at.serialize)
+ end
+
def run_every?(time)
!@last_ran || !@every || @every.next_at(@last_ran) <= time
end
def run_at?(time)
@@ -129,21 +161,13 @@
!@if || @if.call(time)
end
def ran!(time)
@last_ran = time
- @redis.set(run_time_key, @last_ran.to_i)
+ @redis.set(last_ran_key, @last_ran.to_i)
end
- def run_time_key
- "zhong:last_ran:#{self}"
- end
-
- def disabled_key
- "zhong:disabled:#{self}"
- end
-
- def lock_key
- "zhong:lock:#{self}"
+ def redis_lock
+ @lock ||= Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: @long_running_timeout)
end
end
end