lib/spool/pool.rb in spool-1.0.2 vs lib/spool/pool.rb in spool-1.0.3

- old
+ new

@@ -11,15 +11,16 @@ USR2: :restart, TTIN: :incr, TTOU: :decr } - attr_reader :configuration, :processes + attr_reader :configuration, :working_processes, :zombie_processes def initialize(configuration=nil, &block) @configuration = configuration || DSL.configure(&block) - @processes = [] + @working_processes = [] + @zombie_processes = Set.new @running = false @actions_queue = [] end def running? @@ -28,10 +29,14 @@ def stopped? !running? end + def all_processes + working_processes + zombie_processes.to_a + end + [:incr, :decr, :reload, :restart, :stop, :stop!].each do |method| define_method method do |*args| actions_queue.push(name: "_#{method}".to_sym, args: args) nil end @@ -43,14 +48,14 @@ handle_signals File.write configuration.pid_file, Process.pid if configuration.pid_file configuration.processes.times.map do - processes << Spawner.spawn(configuration) + working_processes << Spawner.spawn(configuration) end - logger.info(self.class) { "SPOOL START childrens: #{processes.map(&:pid)}" } + logger.info(self.class) { "SPOOL START => #{format_processes}" } while running? action = actions_queue.pop if action @@ -67,10 +72,11 @@ logger.info(self.class) { "Spool finished successfully!" } end private + attr_writer :working_processes, :zombie_processes attr_reader :actions_queue def handle_signals SIGNALS.each do |signal, event| Signal.trap(signal) do @@ -79,40 +85,35 @@ end end end def check_status - processes.delete_if { |p| !p.alive? } + clear_dead_processes + + check_processes_to_restart - to_restart = processes.select(&configuration.restart_condition) - logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"} if to_restart.any? - stop_processes to_restart + if configuration.processes > all_processes_count + logger.info(self.class) { "Initializing new children. Current State => #{format_processes}" } - if configuration.processes > processes.count - logger.info(self.class) { "Initialize new children: #{processes.map(&:pid)}" } - - (configuration.processes - processes.count).times do - processes << Spawner.spawn(configuration) + (configuration.processes - all_processes_count).times do + working_processes << Spawner.spawn(configuration) end - logger.info(self.class) { "New children: #{processes.map(&:pid)}" } - elsif configuration.processes < processes.count - logger.info(self.class) { "Kill childrens: #{processes.map(&:pid)}" } + logger.info(self.class) { "New children: #{working_processes.last.pid}" } + elsif configuration.processes < working_processes.count + count_to_kill = working_processes.count - configuration.processes + logger.info(self.class) { "Killing #{count_to_kill} children. Current state => #{format_processes}" } - list = processes.take(processes.count - configuration.processes) - stop_processes list - wait_for_stopped list - list.each { |p| processes.delete p } + stop_processes working_processes.take(count_to_kill) - logger.info(self.class) { "After kill childrens: #{processes.map(&:pid)}" } + logger.info(self.class) { "After killing childers. Current State => #{format_processes}" } end rescue Exception => e log_error e end - def _incr(count=1) configuration.processes += count end def _decr(count=1) @@ -124,65 +125,87 @@ @configuration = DSL.configure configuration.source_file if configuration.source_file end def _restart logger.info(self.class) { "RESTART" } - stop_processes processes + stop_processes working_processes end def _stop(timeout=0) logger.info(self.class) { "SPOOL STOP" } - stop_processes processes - Timeout.timeout(timeout) { wait_for_stopped processes } + stop_processes working_processes + Timeout.timeout(timeout) { wait_for_stopped all_processes } rescue Timeout::Error logger.error(self.class) { "ERROR IN SPOOL STOP. Timeout error" } ensure _stop! @running = false end def _stop! - logger.info(self.class) { "SPOOL STOP! kill this children (#{processes.map(&:pid)})" } + logger.info(self.class) { "SPOOL STOP! Going to kill => #{format_processes}" } - processes.each do |p| + all_processes.each do |p| begin - p.send_signal(configuration.kill_signal) if p.alive? + send_signal_to(p, configuration.kill_signal) if p.alive? rescue Datacenter::Shell::CommandError => e - if p.alive? - log_error e - else - logger.info(self.class) { "Signal KILL was sent to #{p.pid} but process was already dead" } - end + log_error e end end - wait_for_stopped processes + wait_for_stopped all_processes - processes.clear + working_processes.clear + zombie_processes.clear File.delete configuration.pid_file if File.exist? configuration.pid_file @running = false end def stop_processes(processes_list) processes_list.each do |p| begin - logger.info(self.class) {"Going to kill process #{p.pid}, alive? => #{p.alive?}"} - p.send_signal configuration.stop_signal + send_signal_to p, configuration.stop_signal + zombie_processes << p rescue Exception => e log_error e end end + + working_processes.delete_if{ |p| zombie_processes.include? p } end - def wait_for_stopped(processes) - while processes.any?(&:alive?) + def wait_for_stopped(processes_list) + while processes_list.any?(&:alive?) sleep 0.01 end end + def check_processes_to_restart + to_restart = working_processes.select(&configuration.restart_condition) + + if to_restart.any? + logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"} + stop_processes to_restart + end + end + + def send_signal_to(process, signal) + logger.info(self.class) { "Going to send signal #{signal} to process #{process.pid}" } + process.send_signal signal + end + + def clear_dead_processes + working_processes.delete_if { |p| !p.alive? } + zombie_processes.delete_if { |p| !p.alive? } + end + + def all_processes_count + working_processes.count + zombie_processes.count + end + def logger configuration.logger end def log_error(error) @@ -193,9 +216,13 @@ return "EMPTY" if actions_queue.empty? actions_queue.map.with_index do |action, index| "#{index+1} => #{action[:name]}" end.join("\n") + end + + def format_processes + "Working Processes: #{working_processes.map(&:pid)}, Zombie Processes: #{zombie_processes.map(&:pid)}" end end end