module Spool class Pool CHECK_TIMEOUT = 0.01 SIGNALS = { INT: :stop!, TERM: :stop!, QUIT: :stop, HUP: :reload, USR2: :restart, TTIN: :incr, TTOU: :decr } attr_reader :configuration, :working_processes, :zombie_processes def initialize(configuration=nil, &block) @configuration = configuration || DSL.configure(&block) @working_processes = [] @zombie_processes = Set.new @running = false @actions_queue = [] end def running? @running end def stopped? !running? end def all_processes working_processes + zombie_processes.to_a end [:incr, :decr, :reload, :restart, :stop, :stop!].each do |method| define_method method do |*args| actions_queue.push(name: "_#{method}".to_sym, args: args) nil end end def start @running = true handle_signals File.write configuration.pid_file, Process.pid if configuration.pid_file configuration.processes.times.map do working_processes << Spawner.spawn(configuration) end logger.info(self.class) { "SPOOL START => #{format_processes}" } while running? action = actions_queue.pop if action logger.info(self.class) { "Starting action #{action[:name]} with params: [#{action[:args].join(', ')}]" } send action[:name], *action[:args] end if running? check_status sleep CHECK_TIMEOUT end end logger.info(self.class) { "Spool finished successfully!" } end private attr_writer :working_processes, :zombie_processes attr_reader :actions_queue def handle_signals SIGNALS.each do |signal, event| Signal.trap(signal) do logger.info(self.class) { "Signal #{signal} received. Current state of actions queue is:\n#{format_actions_queue}" } send event end end end def check_status clear_dead_processes check_processes_to_restart if configuration.processes > all_processes_count logger.info(self.class) { "Initializing new children. Current State => #{format_processes}" } (configuration.processes - all_processes_count).times do working_processes << Spawner.spawn(configuration) end logger.info(self.class) { "New children: #{working_processes.last.pid}" } elsif configuration.processes < working_processes.count count_to_kill = working_processes.count - configuration.processes logger.info(self.class) { "Killing #{count_to_kill} children. Current state => #{format_processes}" } stop_processes working_processes.take(count_to_kill) logger.info(self.class) { "After killing childers. Current State => #{format_processes}" } end rescue Exception => e log_error e end def _incr(count=1) configuration.processes += count end def _decr(count=1) configuration.processes -= count configuration.processes = 0 if configuration.processes < 0 end def _reload @configuration = DSL.configure configuration.source_file if configuration.source_file end def _restart logger.info(self.class) { "RESTART" } stop_processes working_processes end def _stop(timeout=0) logger.info(self.class) { "SPOOL STOP" } stop_processes working_processes Timeout.timeout(timeout) { wait_for_stopped all_processes } rescue Timeout::Error logger.error(self.class) { "ERROR IN SPOOL STOP. Timeout error" } ensure _stop! @running = false end def _stop! logger.info(self.class) { "SPOOL STOP! Going to kill => #{format_processes}" } all_processes.each do |p| begin send_signal_to(p, configuration.kill_signal) if p.alive? rescue Datacenter::Shell::CommandError => e log_error e end end wait_for_stopped all_processes working_processes.clear zombie_processes.clear File.delete configuration.pid_file if File.exist? configuration.pid_file @running = false end def stop_processes(processes_list) processes_list.each do |p| begin send_signal_to p, configuration.stop_signal zombie_processes << p rescue Exception => e log_error e end end working_processes.delete_if{ |p| zombie_processes.include? p } end def wait_for_stopped(processes_list) while processes_list.any?(&:alive?) sleep 0.01 end end def check_processes_to_restart to_restart = working_processes.select(&configuration.restart_condition) if to_restart.any? logger.info(self.class) {"Restart condition successful in child processes: #{to_restart.map(&:pid)}"} stop_processes to_restart end end def send_signal_to(process, signal) logger.info(self.class) { "Going to send signal #{signal} to process #{process.pid}" } process.send_signal signal end def clear_dead_processes working_processes.delete_if { |p| !p.alive? } zombie_processes.delete_if { |p| !p.alive? } end def all_processes_count working_processes.count + zombie_processes.count end def logger configuration.logger end def log_error(error) logger.error(self.class) { "#{error.message}\n#{error.backtrace.join("\n")}" } end def format_actions_queue return "EMPTY" if actions_queue.empty? actions_queue.map.with_index do |action, index| "#{index+1} => #{action[:name]}" end.join("\n") end def format_processes "Working Processes: #{working_processes.map(&:pid)}, Zombie Processes: #{zombie_processes.map(&:pid)}" end end end