lib/zhong/scheduler.rb in zhong-0.1.5 vs lib/zhong/scheduler.rb in zhong-0.1.6
- old
+ new
@@ -17,24 +17,33 @@
@logger = @config[:logger]
@redis = @config[:redis]
@tz = @config[:tz]
@category = nil
@error_handler = nil
+ @running = false
end
+ def clear
+ raise "unable to clear while running; run Zhong.stop first" if @running
+
+ @jobs = {}
+ @callbacks = {}
+ @category = nil
+ end
+
def category(name)
- fail "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category
+ raise "cannot nest categories: #{name} would be nested in #{@category} (#{caller.first})" if @category
@category = name.to_s
yield(self)
@category = nil
end
def every(period, name, opts = {}, &block)
- fail "must specify a period for #{name} (#{caller.first})" unless period
+ raise "must specify a period for #{name} (#{caller.first})" unless period
job = Job.new(name, opts.merge(@config).merge(every: period, category: @category), &block)
if jobs.key?(job.id)
@logger.error "duplicate job #{job}, skipping"
@@ -48,22 +57,26 @@
@error_handler = block if block_given?
@error_handler
end
def on(event, &block)
- fail "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym)
+ raise "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run].include?(event.to_sym)
(@callbacks[event.to_sym] ||= []) << block
end
def start
@logger.info "starting at #{redis_time}"
@stop = false
trap_signals
+ raise "already running" if @running
+
loop do
+ @running = true
+
if fire_callbacks(:before_tick)
now = redis_time
jobs_to_run(now).each do |_, job|
break if @stop
@@ -81,18 +94,30 @@
end
break if @stop
end
+ @running = false
+
Thread.new { @logger.info "stopped" }.join
end
def stop
- Thread.new { @logger.error "stopping" } # thread necessary due to trap context
+ Thread.new { @logger.error "stopping" } if @running # thread necessary due to trap context
@stop = true
end
+ def find_by_name(job_name)
+ @jobs[Digest::SHA256.hexdigest(job_name)]
+ end
+
+ def redis_time
+ s, ms = @redis.time # returns [seconds since epoch, microseconds]
+ now = Time.at(s + ms / (10**6))
+ @tz ? now.in_time_zone(@tz) : now
+ end
+
private
TRAPPED_SIGNALS = %w(QUIT INT TERM).freeze
private_constant :TRAPPED_SIGNALS
@@ -105,13 +130,13 @@
end
def run_job(job, time = redis_time)
return unless fire_callbacks(:before_run, job, time)
- job.run(time, error_handler)
+ ran = job.run(time, error_handler)
- fire_callbacks(:after_run, job, time)
+ fire_callbacks(:after_run, job, time, ran)
end
def heartbeat(time)
@redis.setex(heartbeat_key, @config[:grace].to_i, time.to_i)
end
@@ -127,14 +152,8 @@
end
def sleep_until_next_second
GC.start
sleep(1.0 - Time.now.subsec + 0.0001)
- end
-
- def redis_time
- s, ms = @redis.time # returns [seconds since epoch, microseconds]
- now = Time.at(s + ms / (10**6))
- @tz ? now.in_time_zone(@tz) : now
end
end
end