lib/sqsrun/worker.rb in sqsrun-0.4.0 vs lib/sqsrun/worker.rb in sqsrun-0.5.0

- old
+ new

@@ -10,23 +10,28 @@ @secret_key = conf[:secret_key] @queue_name = conf[:queue] @visibility_timeout = conf[:timeout] @extend_timeout = conf[:extend_timeout] @kill_timeout = conf[:kill_timeout] + @kill_retry = conf[:kill_retry] @interval = conf[:interval] @finished = false - @extender = VisibilityExtender.new(@visibility_timeout, @extend_timeout) + @extender = TimerThread.new(@visibility_timeout, @extend_timeout, @kill_timeout, @kill_retry) @sqs = RightAws::SqsGen2.new(@key_id, @secret_key) @queue = @sqs.queue(@queue_name) @mutex = Mutex.new @cond = ConditionVariable.new end - def run(run_proc) + def init_proc(run_proc, kill_proc) @run_proc = run_proc + @extender.init_proc(kill_proc) + end + + def run @extender.start until @finished msg = @queue.receive(@visibility_timeout) if msg process(msg) @@ -77,19 +82,12 @@ @extender.set_message(msg) success = false begin - joined = thread.join(@kill_timeout) - if joined - thread.value - success = true - puts "finished id=#{msg.id}" - else - thread.kill - puts "killed id=#{msg.id}" - end + thread.join + success = true rescue puts "failed id=#{msg.id}: #{$!}" $!.backtrace.each {|bt| puts " #{bt}" } @@ -102,33 +100,43 @@ else msg.visibility = 0 end end - class VisibilityExtender + class TimerThread include MonitorMixin - def initialize(visibility_timeout, extend_timeout) + def initialize(visibility_timeout, extend_timeout, kill_timeout, kill_retry) super() @visibility_timeout = visibility_timeout @extend_timeout = extend_timeout + @kill_timeout = kill_timeout + @kill_retry = kill_retry + @kill_time = nil + @kill_proc = nil @extend_time = nil @message = nil @finished = false end + def init_proc(kill_proc) + @kill_proc = kill_proc + end + def start @thread = Thread.new(&method(:run)) end def join @thread.join end def set_message(msg) synchronize do - @extend_time = Time.now.to_i + @extend_timeout + now = Time.now.to_i + @extend_time = now + @extend_timeout + @kill_time = now + @kill_timeout @message = msg end end def reset_message @@ -144,36 +152,52 @@ private def run until @finished sleep 1 synchronize do - try_extend(@message) if @message + if @message + now = Time.now.to_i + try_kill(now, @message) + try_extend(now, @message) + end end end end - def try_extend(msg) - now = Time.now.to_i + def try_extend(now, msg) if now > @extend_time ntime = msg.visibility + @visibility_timeout puts "extending timeout=#{ntime} id=#{msg.id}" msg.visibility = ntime @extend_time = now + @extend_timeout end end + + def try_kill(now, msg) + if now > @kill_time + if @kill_proc + puts "killing #{msg.id}..." + @kill_proc.call rescue nil + end + @kill_time = now + @kill_retry + end + end end end class ExecRunner def initialize(cmd) @cmd = cmd + ' ' + ARGV.map {|a| Shellwords.escape(a) }.join(' ') @iobuf = '' + @pid = nil + @next_kill = :TERM end def call(message) IO.popen(@cmd, "r+") {|io| + @pid = io.pid io.write(message) rescue nil io.close_write begin while true io.sysread(1024, @iobuf) @@ -183,9 +207,14 @@ end } if $?.to_i != 0 raise "Command failed" end + end + + def terminate + Process.kill(@next_kill, @pid) + @next_kill = :KILL end end def self.worker=(worker)