lib/spool/pool.rb in spool-0.1.1 vs lib/spool/pool.rb in spool-1.0.0

- old
+ new

@@ -1,112 +1,101 @@ module Spool class Pool + CHECK_TIMEOUT = 0.01 + SIGNALS = { INT: :stop!, TERM: :stop!, QUIT: :stop, HUP: :reload, USR2: :restart, TTIN: :incr, TTOU: :decr } - attr_reader :configuration, :runner, :processes + attr_reader :configuration, :processes def initialize(configuration=nil, &block) @configuration = configuration || DSL.configure(&block) @processes = [] - @started = false + @running = false + @actions_queue = [] end - def started? - @started + def running? + @running end def stopped? - !started? + !running? end + [:incr, :decr, :reload, :restart, :stop, :stop!].each do |method| + define_method method do |*args| + actions_queue.push(name: "_#{method}".to_sym, args: args) + nil + end + end + def start - @started = true + @running = true handle_signals File.write configuration.pid_file, Process.pid if configuration.pid_file configuration.processes.times.map do processes << Spawner.spawn(configuration) end + logger.info(self.class) { "SPOOL START childrens: #{processes.map(&:pid)}" } - while @started - check_status - sleep 0.05 + while running? + action = actions_queue.pop + + if action + logger.info(self.class) { "Starting action #{action[:name]} with params: [#{action[:args].join(', ')}]" } + send action[:name], *action[:args] + end + + if running? + check_status + sleep CHECK_TIMEOUT + end end - end - def stop(timeout=0) - logger.info(self.class) { "SPOOL STOP" } - stop_processes processes - Timeout.timeout(timeout) { wait_for_stopped processes } - rescue Timeout::Error - logger.error(self.class) { "ERROR IN SPOOL STOP. Timeout error" } - ensure - stop! + logger.info(self.class) { "Spool finished successfully!" } end - def stop! - @started = false - logger.info(self.class) { "SPOOL STOP! kill this children (#{processes.map(&:pid)})" } - processes.each { |p| p.send_signal configuration.kill_signal} - wait_for_stopped processes - processes.clear - File.delete configuration.pid_file if File.exists? configuration.pid_file - end - - def incr(count=1) - configuration.processes += count - end - - def decr(count=1) - configuration.processes -= count - end - - def reload - @configuration = DSL.configure configuration.source_file if configuration.source_file - end - - def restart - logger.info(self.class) { "RESTART" } - stop_processes processes - end - private + attr_reader :actions_queue + def handle_signals SIGNALS.each do |signal, event| - Signal.trap(signal) { send event } + Signal.trap(signal) do + logger.info(self.class) { "Signal #{signal} received. Current state of actions queue is:\n#{format_actions_queue}" } + send event + end end end - def check_status - return if stopped? - - stop_processes processes.select(&configuration.restart_condition) + def check_status processes.delete_if { |p| !p.alive? } + + to_restart = processes.select(&configuration.restart_condition) + stop_processes to_restart - return if stopped? if configuration.processes > processes.count logger.info(self.class) { "Initialize new children: #{processes.map(&:pid)}" } (configuration.processes - processes.count).times do processes << Spawner.spawn(configuration) end - logger.info(self.class) { "new children: #{processes.map(&:pid)}" } - + logger.info(self.class) { "New children: #{processes.map(&:pid)}" } elsif configuration.processes < processes.count logger.info(self.class) { "Kill childrens: #{processes.map(&:pid)}" } list = processes.take(processes.count - configuration.processes) stop_processes list @@ -114,25 +103,97 @@ list.each { |p| processes.delete p } logger.info(self.class) { "After kill childrens: #{processes.map(&:pid)}" } end - rescue - retry + rescue Exception => e + log_error e end + + def _incr(count=1) + configuration.processes += count + end + + def _decr(count=1) + configuration.processes -= count + configuration.processes = 0 if configuration.processes < 0 + end + + def _reload + @configuration = DSL.configure configuration.source_file if configuration.source_file + end + + def _restart + logger.info(self.class) { "RESTART" } + stop_processes processes + end + + def _stop(timeout=0) + logger.info(self.class) { "SPOOL STOP" } + + stop_processes processes + Timeout.timeout(timeout) { wait_for_stopped processes } + rescue Timeout::Error + logger.error(self.class) { "ERROR IN SPOOL STOP. Timeout error" } + ensure + _stop! + @running = false + end + + def _stop! + logger.info(self.class) { "SPOOL STOP! kill this children (#{processes.map(&:pid)})" } + + processes.each do |p| + begin + p.send_signal(configuration.kill_signal) if p.alive? + rescue Datacenter::Shell::CommandError => e + if p.alive? + log_error e + else + logger.info(self.class) { "Signal KILL was sent to #{p.pid} but process was already dead" } + end + end + end + + wait_for_stopped processes + + processes.clear + + File.delete configuration.pid_file if File.exist? configuration.pid_file + @running = false + end + def stop_processes(processes_list) - processes_list.each { |p| p.send_signal configuration.stop_signal } + processes_list.each do |p| + begin + p.send_signal configuration.stop_signal + rescue Exception => e + log_error e + end + end end def wait_for_stopped(processes) while processes.any?(&:alive?) sleep 0.01 end end def logger configuration.logger + end + + def log_error(error) + logger.error(self.class) { "#{error.message}\n#{error.backtrace.join("\n")}" } + end + + def format_actions_queue + return "EMPTY" if actions_queue.empty? + + actions_queue.map.with_index do |action, index| + "#{index+1} => #{a[:name]}" + end.join("\n") end end end \ No newline at end of file