lib/rsched/engine.rb in rsched-0.2.0 vs lib/rsched/engine.rb in rsched-0.3.0
- old
+ new
@@ -1,6 +1,7 @@
require 'thread'
+require 'monitor'
require 'time'
require 'rsched/lock'
module RSched
@@ -48,57 +49,60 @@
@lock = lock
@resume = conf[:resume]
@delay = conf[:delay]
@interval = conf[:interval]
@delete = conf[:delete]
+ @extend_timeout = conf[:extend_timeout]
+ @kill_timeout = conf[:kill_timeout]
+ @kill_retry = conf[:kill_retry]
@sched_start = conf[:from] || 0
+ @release_on_fail = conf[:release_on_fail]
@finished = false
@ss = {}
+ @extender = TimerThread.new(@lock, @extend_timeout, @kill_timeout, @kill_retry)
+
@mutex = Mutex.new
@cond = ConditionVariable.new
end
# {cron => (ident,action)}
def set_sched(ident, action, cron)
now = Time.now.to_i
@ss[ident] = Sched.new(cron, action, @sched_start, now-@resume, now-@delay)
end
- def run(run_proc)
+ def init_proc(run_proc, kill_proc)
+ @run_proc = run_proc
+ @extender.init_proc(kill_proc)
+ end
+
+ def run
+ @extender.start
until @finished
one = false
now = Time.now.to_i - @delay
@ss.each_pair {|ident,s|
s.sched(now)
s.queue.delete_if {|time|
next if @finished
- x = @lock.acquire(ident, time)
- case x
+ token = @lock.acquire(ident, time)
+ case token
when nil
# already finished
true
when false
# not finished but already locked
false
else
one = true
- if process(ident, time, s.action, run_proc)
- # success
- @lock.finish(x)
- try_delete(ident)
- true
- else
- # fail
- @lock.release(x)
- false
- end
+ process(token, ident, time, s.action)
end
}
break if @finished
}
@@ -112,10 +116,11 @@
end
end
def shutdown
@finished = true
+ @extender.shutdown
@mutex.synchronize {
@cond.broadcast
}
end
@@ -136,38 +141,139 @@
@cond.wait(@mutex, sec)
}
end
end
- def process(ident, time, action, run_proc)
+ def process(token, ident, time, action)
+ puts "started token=#{token.inspect} time=#{time}"
+
+ @extender.set_token(token)
+
+ success = false
begin
- run_proc.call(ident, time, action)
- return true
+ @run_proc.call(ident, time, action)
+ puts "finished token=#{token.inspect}"
+ success = true
rescue
- puts "failed ident=#{ident} time=#{time}: #{$!}"
+ puts "failed token=#{token.inspect} time=#{time}: #{$!}"
$!.backtrace.each {|bt|
puts " #{bt}"
}
- return false
end
+
+ @extender.reset_token
+
+ if success
+ @lock.finish(token)
+ cleanup_old_entries(ident)
+ true
+ else
+ if @release_on_fail
+ @lock.release(token)
+ end
+ false
+ end
end
- def try_delete(ident)
+ def cleanup_old_entries(ident)
@lock.delete_before(ident, Time.now.to_i-@delete)
end
+
+ class TimerThread
+ include MonitorMixin
+
+ def initialize(lock, extend_timeout, kill_timeout, kill_retry)
+ super()
+ @lock = lock
+ @extend_timeout = extend_timeout
+ @kill_timeout = kill_timeout
+ @kill_retry = kill_retry
+ @kill_time = nil
+ @kill_proc = nil
+ @extend_time = nil
+ @token = nil
+ @finished = false
+ end
+
+ def init_proc(kill_proc)
+ @kill_proc = kill_proc
+ end
+
+ def start
+ @thread = Thread.new(&method(:run))
+ end
+
+ def join
+ @thread.join
+ end
+
+ def set_token(token)
+ synchronize do
+ now = Time.now.to_i
+ @extend_time = now + @extend_timeout
+ @kill_time = now + @kill_timeout
+ @token = token
+ end
+ end
+
+ def reset_token
+ synchronize do
+ @token = nil
+ end
+ end
+
+ def shutdown
+ @finished = true
+ end
+
+ private
+ def run
+ until @finished
+ sleep 1
+ synchronize do
+ if @token
+ now = Time.now.to_i
+ try_kill(now, @token)
+ try_extend(now, @token)
+ end
+ end
+ end
+ end
+
+ def try_extend(now, token)
+ if now > @extend_time
+ puts "extending token=#{token.inspect}"
+ @lock.extend_timeout(token, now)
+ @extend_time = now + @extend_timeout
+ end
+ end
+
+ def try_kill(now, token)
+ if now > @kill_time
+ if @kill_proc
+ puts "killing #{token.inspect}..."
+ @kill_proc.call rescue nil
+ end
+ @kill_time = now + @kill_retry
+ end
+ end
+ end
end
class ExecRunner
def initialize(cmd)
@cmd = cmd + ' ' + ARGV.map {|a| Shellwords.escape(a) }.join(' ')
@iobuf = ''
+ @pid = nil
+ @next_kill = :TERM
end
def call(ident, time, action)
message = [ident, time, action].join("\t")
IO.popen(@cmd, "r+") {|io|
+ @pid = io.pid
io.write(message) rescue nil
io.close_write
begin
while true
io.sysread(1024, @iobuf)
@@ -177,9 +283,14 @@
end
}
if $?.to_i != 0
raise "Command failed"
end
+ end
+
+ def terminate
+ Process.kill(@next_kill, @pid)
+ @next_kill = :KILL
end
end
end # module RSched