lib/rsched/engine.rb in rsched-0.2.0 vs lib/rsched/engine.rb in rsched-0.3.0

- old
+ new

@@ -1,6 +1,7 @@ require 'thread' +require 'monitor' require 'time' require 'rsched/lock' module RSched @@ -48,57 +49,60 @@ @lock = lock @resume = conf[:resume] @delay = conf[:delay] @interval = conf[:interval] @delete = conf[:delete] + @extend_timeout = conf[:extend_timeout] + @kill_timeout = conf[:kill_timeout] + @kill_retry = conf[:kill_retry] @sched_start = conf[:from] || 0 + @release_on_fail = conf[:release_on_fail] @finished = false @ss = {} + @extender = TimerThread.new(@lock, @extend_timeout, @kill_timeout, @kill_retry) + @mutex = Mutex.new @cond = ConditionVariable.new end # {cron => (ident,action)} def set_sched(ident, action, cron) now = Time.now.to_i @ss[ident] = Sched.new(cron, action, @sched_start, now-@resume, now-@delay) end - def run(run_proc) + def init_proc(run_proc, kill_proc) + @run_proc = run_proc + @extender.init_proc(kill_proc) + end + + def run + @extender.start until @finished one = false now = Time.now.to_i - @delay @ss.each_pair {|ident,s| s.sched(now) s.queue.delete_if {|time| next if @finished - x = @lock.acquire(ident, time) - case x + token = @lock.acquire(ident, time) + case token when nil # already finished true when false # not finished but already locked false else one = true - if process(ident, time, s.action, run_proc) - # success - @lock.finish(x) - try_delete(ident) - true - else - # fail - @lock.release(x) - false - end + process(token, ident, time, s.action) end } break if @finished } @@ -112,10 +116,11 @@ end end def shutdown @finished = true + @extender.shutdown @mutex.synchronize { @cond.broadcast } end @@ -136,38 +141,139 @@ @cond.wait(@mutex, sec) } end end - def process(ident, time, action, run_proc) + def process(token, ident, time, action) + puts "started token=#{token.inspect} time=#{time}" + + @extender.set_token(token) + + success = false begin - run_proc.call(ident, time, action) - return true + @run_proc.call(ident, time, action) + puts "finished token=#{token.inspect}" + success = true rescue - puts "failed ident=#{ident} time=#{time}: #{$!}" + puts "failed token=#{token.inspect} time=#{time}: #{$!}" $!.backtrace.each {|bt| puts " #{bt}" } - return false end + + @extender.reset_token + + if success + @lock.finish(token) + cleanup_old_entries(ident) + true + else + if @release_on_fail + @lock.release(token) + end + false + end end - def try_delete(ident) + def cleanup_old_entries(ident) @lock.delete_before(ident, Time.now.to_i-@delete) end + + class TimerThread + include MonitorMixin + + def initialize(lock, extend_timeout, kill_timeout, kill_retry) + super() + @lock = lock + @extend_timeout = extend_timeout + @kill_timeout = kill_timeout + @kill_retry = kill_retry + @kill_time = nil + @kill_proc = nil + @extend_time = nil + @token = nil + @finished = false + end + + def init_proc(kill_proc) + @kill_proc = kill_proc + end + + def start + @thread = Thread.new(&method(:run)) + end + + def join + @thread.join + end + + def set_token(token) + synchronize do + now = Time.now.to_i + @extend_time = now + @extend_timeout + @kill_time = now + @kill_timeout + @token = token + end + end + + def reset_token + synchronize do + @token = nil + end + end + + def shutdown + @finished = true + end + + private + def run + until @finished + sleep 1 + synchronize do + if @token + now = Time.now.to_i + try_kill(now, @token) + try_extend(now, @token) + end + end + end + end + + def try_extend(now, token) + if now > @extend_time + puts "extending token=#{token.inspect}" + @lock.extend_timeout(token, now) + @extend_time = now + @extend_timeout + end + end + + def try_kill(now, token) + if now > @kill_time + if @kill_proc + puts "killing #{token.inspect}..." + @kill_proc.call rescue nil + end + @kill_time = now + @kill_retry + end + end + end end class ExecRunner def initialize(cmd) @cmd = cmd + ' ' + ARGV.map {|a| Shellwords.escape(a) }.join(' ') @iobuf = '' + @pid = nil + @next_kill = :TERM end def call(ident, time, action) message = [ident, time, action].join("\t") IO.popen(@cmd, "r+") {|io| + @pid = io.pid io.write(message) rescue nil io.close_write begin while true io.sysread(1024, @iobuf) @@ -177,9 +283,14 @@ end } if $?.to_i != 0 raise "Command failed" end + end + + def terminate + Process.kill(@next_kill, @pid) + @next_kill = :KILL end end end # module RSched