lib/resque/worker.rb in resque-1.21.0 vs lib/resque/worker.rb in resque-1.22.0
- old
+ new
@@ -18,10 +18,15 @@
# Boolean indicating whether this worker can or can not fork.
# Automatically set if a fork(2) fails.
attr_accessor :cant_fork
+ attr_accessor :term_timeout
+
+ # decide whether to use new_kill_child logic
+ attr_accessor :term_child
+
attr_writer :to_s
# Returns an array of all worker objects.
def self.all
Array(redis.smembers(:workers)).map { |id| find(id) }.compact
@@ -135,12 +140,17 @@
working_on job
if @child = fork
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
- Process.wait(@child)
+ begin
+ Process.waitpid(@child)
+ rescue SystemCallError
+ nil
+ end
else
+ unregister_signal_handlers if !@cant_fork && term_child
procline "Processing #{job.queue} since #{Time.now.to_i}"
redis.client.reconnect # Don't share connection with parent
perform(job, &block)
exit! unless @cant_fork
end
@@ -236,10 +246,11 @@
end
end
# Runs all the methods needed when a worker begins its lifecycle.
def startup
+ warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child
enable_gc_optimizations
register_signal_handlers
prune_dead_workers
run_hook :before_first_fork
register_worker
@@ -269,31 +280,51 @@
trap('TERM') { shutdown! }
trap('INT') { shutdown! }
begin
trap('QUIT') { shutdown }
- trap('USR1') { kill_child }
+ if term_child
+ trap('USR1') { new_kill_child }
+ else
+ trap('USR1') { kill_child }
+ end
trap('USR2') { pause_processing }
trap('CONT') { unpause_processing }
rescue ArgumentError
warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
end
log! "Registered signals"
end
+ def unregister_signal_handlers
+ trap('TERM') { raise TermException.new("SIGTERM") }
+ trap('INT', 'DEFAULT')
+
+ begin
+ trap('QUIT', 'DEFAULT')
+ trap('USR1', 'DEFAULT')
+ trap('USR2', 'DEFAULT')
+ rescue ArgumentError
+ end
+ end
+
# Schedule this worker for shutdown. Will finish processing the
# current job.
def shutdown
log 'Exiting...'
@shutdown = true
end
# Kill the child and shutdown immediately.
def shutdown!
shutdown
- kill_child
+ if term_child
+ new_kill_child
+ else
+ kill_child
+ end
end
# Should this worker shutdown as soon as current job is finished?
def shutdown?
@shutdown
@@ -309,9 +340,31 @@
else
log! "Child #{@child} not found, restarting."
shutdown
end
end
+ end
+
+ # Kills the forked child immediately with minimal remorse. The job it
+ # is processing will not be completed. Send the child a TERM signal,
+ # wait 5 seconds, and then a KILL signal if it has not quit
+ def new_kill_child
+ if @child
+ unless Process.waitpid(@child, Process::WNOHANG)
+ log! "Sending TERM signal to child #{@child}"
+ Process.kill("TERM", @child)
+ (term_timeout.to_f * 10).round.times do |i|
+ sleep(0.1)
+ return if Process.waitpid(@child, Process::WNOHANG)
+ end
+ log! "Sending KILL signal to child #{@child}"
+ Process.kill("KILL", @child)
+ else
+ log! "Child #{@child} already quit."
+ end
+ end
+ rescue SystemCallError
+ log! "Child #{@child} already quit and reaped."
end
# are we paused?
def paused?
@paused