lib/spool/pool.rb in spool-1.0.2 vs lib/spool/pool.rb in spool-1.0.3
- old
+ new
@@ -11,15 +11,16 @@
USR2: :restart,
TTIN: :incr,
TTOU: :decr
}
- attr_reader :configuration, :processes
+ attr_reader :configuration, :working_processes, :zombie_processes
def initialize(configuration=nil, &block)
@configuration = configuration || DSL.configure(&block)
- @processes = []
+ @working_processes = []
+ @zombie_processes = Set.new
@running = false
@actions_queue = []
end
def running?
@@ -28,10 +29,14 @@
def stopped?
!running?
end
+ def all_processes
+ working_processes + zombie_processes.to_a
+ end
+
[:incr, :decr, :reload, :restart, :stop, :stop!].each do |method|
define_method method do |*args|
actions_queue.push(name: "_#{method}".to_sym, args: args)
nil
end
@@ -43,14 +48,14 @@
handle_signals
File.write configuration.pid_file, Process.pid if configuration.pid_file
configuration.processes.times.map do
- processes << Spawner.spawn(configuration)
+ working_processes << Spawner.spawn(configuration)
end
- logger.info(self.class) { "SPOOL START childrens: #{processes.map(&:pid)}" }
+ logger.info(self.class) { "SPOOL START => #{format_processes}" }
while running?
action = actions_queue.pop
if action
@@ -67,10 +72,11 @@
logger.info(self.class) { "Spool finished successfully!" }
end
private
+ attr_writer :working_processes, :zombie_processes
attr_reader :actions_queue
def handle_signals
SIGNALS.each do |signal, event|
Signal.trap(signal) do
@@ -79,40 +85,35 @@
end
end
end
def check_status
- processes.delete_if { |p| !p.alive? }
+ clear_dead_processes
+
+ check_processes_to_restart
- to_restart = processes.select(&configuration.restart_condition)
- logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"} if to_restart.any?
- stop_processes to_restart
+ if configuration.processes > all_processes_count
+ logger.info(self.class) { "Initializing new children. Current State => #{format_processes}" }
- if configuration.processes > processes.count
- logger.info(self.class) { "Initialize new children: #{processes.map(&:pid)}" }
-
- (configuration.processes - processes.count).times do
- processes << Spawner.spawn(configuration)
+ (configuration.processes - all_processes_count).times do
+ working_processes << Spawner.spawn(configuration)
end
- logger.info(self.class) { "New children: #{processes.map(&:pid)}" }
- elsif configuration.processes < processes.count
- logger.info(self.class) { "Kill childrens: #{processes.map(&:pid)}" }
+ logger.info(self.class) { "New children: #{working_processes.last.pid}" }
+ elsif configuration.processes < working_processes.count
+ count_to_kill = working_processes.count - configuration.processes
+ logger.info(self.class) { "Killing #{count_to_kill} children. Current state => #{format_processes}" }
- list = processes.take(processes.count - configuration.processes)
- stop_processes list
- wait_for_stopped list
- list.each { |p| processes.delete p }
+ stop_processes working_processes.take(count_to_kill)
- logger.info(self.class) { "After kill childrens: #{processes.map(&:pid)}" }
+ logger.info(self.class) { "After killing childers. Current State => #{format_processes}" }
end
rescue Exception => e
log_error e
end
-
def _incr(count=1)
configuration.processes += count
end
def _decr(count=1)
@@ -124,65 +125,87 @@
@configuration = DSL.configure configuration.source_file if configuration.source_file
end
def _restart
logger.info(self.class) { "RESTART" }
- stop_processes processes
+ stop_processes working_processes
end
def _stop(timeout=0)
logger.info(self.class) { "SPOOL STOP" }
- stop_processes processes
- Timeout.timeout(timeout) { wait_for_stopped processes }
+ stop_processes working_processes
+ Timeout.timeout(timeout) { wait_for_stopped all_processes }
rescue Timeout::Error
logger.error(self.class) { "ERROR IN SPOOL STOP. Timeout error" }
ensure
_stop!
@running = false
end
def _stop!
- logger.info(self.class) { "SPOOL STOP! kill this children (#{processes.map(&:pid)})" }
+ logger.info(self.class) { "SPOOL STOP! Going to kill => #{format_processes}" }
- processes.each do |p|
+ all_processes.each do |p|
begin
- p.send_signal(configuration.kill_signal) if p.alive?
+ send_signal_to(p, configuration.kill_signal) if p.alive?
rescue Datacenter::Shell::CommandError => e
- if p.alive?
- log_error e
- else
- logger.info(self.class) { "Signal KILL was sent to #{p.pid} but process was already dead" }
- end
+ log_error e
end
end
- wait_for_stopped processes
+ wait_for_stopped all_processes
- processes.clear
+ working_processes.clear
+ zombie_processes.clear
File.delete configuration.pid_file if File.exist? configuration.pid_file
@running = false
end
def stop_processes(processes_list)
processes_list.each do |p|
begin
- logger.info(self.class) {"Going to kill process #{p.pid}, alive? => #{p.alive?}"}
- p.send_signal configuration.stop_signal
+ send_signal_to p, configuration.stop_signal
+ zombie_processes << p
rescue Exception => e
log_error e
end
end
+
+ working_processes.delete_if{ |p| zombie_processes.include? p }
end
- def wait_for_stopped(processes)
- while processes.any?(&:alive?)
+ def wait_for_stopped(processes_list)
+ while processes_list.any?(&:alive?)
sleep 0.01
end
end
+ def check_processes_to_restart
+ to_restart = working_processes.select(&configuration.restart_condition)
+
+ if to_restart.any?
+ logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"}
+ stop_processes to_restart
+ end
+ end
+
+ def send_signal_to(process, signal)
+ logger.info(self.class) { "Going to send signal #{signal} to process #{process.pid}" }
+ process.send_signal signal
+ end
+
+ def clear_dead_processes
+ working_processes.delete_if { |p| !p.alive? }
+ zombie_processes.delete_if { |p| !p.alive? }
+ end
+
+ def all_processes_count
+ working_processes.count + zombie_processes.count
+ end
+
def logger
configuration.logger
end
def log_error(error)
@@ -193,9 +216,13 @@
return "EMPTY" if actions_queue.empty?
actions_queue.map.with_index do |action, index|
"#{index+1} => #{action[:name]}"
end.join("\n")
+ end
+
+ def format_processes
+ "Working Processes: #{working_processes.map(&:pid)}, Zombie Processes: #{zombie_processes.map(&:pid)}"
end
end
end