lib/sqsrun/worker.rb in sqsrun-0.4.0 vs lib/sqsrun/worker.rb in sqsrun-0.5.0
- old
+ new
@@ -10,23 +10,28 @@
@secret_key = conf[:secret_key]
@queue_name = conf[:queue]
@visibility_timeout = conf[:timeout]
@extend_timeout = conf[:extend_timeout]
@kill_timeout = conf[:kill_timeout]
+ @kill_retry = conf[:kill_retry]
@interval = conf[:interval]
@finished = false
- @extender = VisibilityExtender.new(@visibility_timeout, @extend_timeout)
+ @extender = TimerThread.new(@visibility_timeout, @extend_timeout, @kill_timeout, @kill_retry)
@sqs = RightAws::SqsGen2.new(@key_id, @secret_key)
@queue = @sqs.queue(@queue_name)
@mutex = Mutex.new
@cond = ConditionVariable.new
end
- def run(run_proc)
+ def init_proc(run_proc, kill_proc)
@run_proc = run_proc
+ @extender.init_proc(kill_proc)
+ end
+
+ def run
@extender.start
until @finished
msg = @queue.receive(@visibility_timeout)
if msg
process(msg)
@@ -77,19 +82,12 @@
@extender.set_message(msg)
success = false
begin
- joined = thread.join(@kill_timeout)
- if joined
- thread.value
- success = true
- puts "finished id=#{msg.id}"
- else
- thread.kill
- puts "killed id=#{msg.id}"
- end
+ thread.join
+ success = true
rescue
puts "failed id=#{msg.id}: #{$!}"
$!.backtrace.each {|bt|
puts " #{bt}"
}
@@ -102,33 +100,43 @@
else
msg.visibility = 0
end
end
- class VisibilityExtender
+ class TimerThread
include MonitorMixin
- def initialize(visibility_timeout, extend_timeout)
+ def initialize(visibility_timeout, extend_timeout, kill_timeout, kill_retry)
super()
@visibility_timeout = visibility_timeout
@extend_timeout = extend_timeout
+ @kill_timeout = kill_timeout
+ @kill_retry = kill_retry
+ @kill_time = nil
+ @kill_proc = nil
@extend_time = nil
@message = nil
@finished = false
end
+ def init_proc(kill_proc)
+ @kill_proc = kill_proc
+ end
+
def start
@thread = Thread.new(&method(:run))
end
def join
@thread.join
end
def set_message(msg)
synchronize do
- @extend_time = Time.now.to_i + @extend_timeout
+ now = Time.now.to_i
+ @extend_time = now + @extend_timeout
+ @kill_time = now + @kill_timeout
@message = msg
end
end
def reset_message
@@ -144,36 +152,52 @@
private
def run
until @finished
sleep 1
synchronize do
- try_extend(@message) if @message
+ if @message
+ now = Time.now.to_i
+ try_kill(now, @message)
+ try_extend(now, @message)
+ end
end
end
end
- def try_extend(msg)
- now = Time.now.to_i
+ def try_extend(now, msg)
if now > @extend_time
ntime = msg.visibility + @visibility_timeout
puts "extending timeout=#{ntime} id=#{msg.id}"
msg.visibility = ntime
@extend_time = now + @extend_timeout
end
end
+
+ def try_kill(now, msg)
+ if now > @kill_time
+ if @kill_proc
+ puts "killing #{msg.id}..."
+ @kill_proc.call rescue nil
+ end
+ @kill_time = now + @kill_retry
+ end
+ end
end
end
class ExecRunner
def initialize(cmd)
@cmd = cmd + ' ' + ARGV.map {|a| Shellwords.escape(a) }.join(' ')
@iobuf = ''
+ @pid = nil
+ @next_kill = :TERM
end
def call(message)
IO.popen(@cmd, "r+") {|io|
+ @pid = io.pid
io.write(message) rescue nil
io.close_write
begin
while true
io.sysread(1024, @iobuf)
@@ -183,9 +207,14 @@
end
}
if $?.to_i != 0
raise "Command failed"
end
+ end
+
+ def terminate
+ Process.kill(@next_kill, @pid)
+ @next_kill = :KILL
end
end
def self.worker=(worker)