lib/resque/worker.rb in resque-1.21.0 vs lib/resque/worker.rb in resque-1.22.0

- old
+ new

@@ -18,10 +18,15 @@ # Boolean indicating whether this worker can or can not fork. # Automatically set if a fork(2) fails. attr_accessor :cant_fork + attr_accessor :term_timeout + + # decide whether to use new_kill_child logic + attr_accessor :term_child + attr_writer :to_s # Returns an array of all worker objects. def self.all Array(redis.smembers(:workers)).map { |id| find(id) }.compact @@ -135,12 +140,17 @@ working_on job if @child = fork srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" - Process.wait(@child) + begin + Process.waitpid(@child) + rescue SystemCallError + nil + end else + unregister_signal_handlers if !@cant_fork && term_child procline "Processing #{job.queue} since #{Time.now.to_i}" redis.client.reconnect # Don't share connection with parent perform(job, &block) exit! unless @cant_fork end @@ -236,10 +246,11 @@ end end # Runs all the methods needed when a worker begins its lifecycle. def startup + warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child enable_gc_optimizations register_signal_handlers prune_dead_workers run_hook :before_first_fork register_worker @@ -269,31 +280,51 @@ trap('TERM') { shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } - trap('USR1') { kill_child } + if term_child + trap('USR1') { new_kill_child } + else + trap('USR1') { kill_child } + end trap('USR2') { pause_processing } trap('CONT') { unpause_processing } rescue ArgumentError warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end log! "Registered signals" end + def unregister_signal_handlers + trap('TERM') { raise TermException.new("SIGTERM") } + trap('INT', 'DEFAULT') + + begin + trap('QUIT', 'DEFAULT') + trap('USR1', 'DEFAULT') + trap('USR2', 'DEFAULT') + rescue ArgumentError + end + end + # Schedule this worker for shutdown. Will finish processing the # current job. def shutdown log 'Exiting...' @shutdown = true end # Kill the child and shutdown immediately. def shutdown! shutdown - kill_child + if term_child + new_kill_child + else + kill_child + end end # Should this worker shutdown as soon as current job is finished? def shutdown? @shutdown @@ -309,9 +340,31 @@ else log! "Child #{@child} not found, restarting." shutdown end end + end + + # Kills the forked child immediately with minimal remorse. The job it + # is processing will not be completed. Send the child a TERM signal, + # wait 5 seconds, and then a KILL signal if it has not quit + def new_kill_child + if @child + unless Process.waitpid(@child, Process::WNOHANG) + log! "Sending TERM signal to child #{@child}" + Process.kill("TERM", @child) + (term_timeout.to_f * 10).round.times do |i| + sleep(0.1) + return if Process.waitpid(@child, Process::WNOHANG) + end + log! "Sending KILL signal to child #{@child}" + Process.kill("KILL", @child) + else + log! "Child #{@child} already quit." + end + end + rescue SystemCallError + log! "Child #{@child} already quit and reaped." end # are we paused? def paused? @paused