lib/navy/captain.rb in navy-0.0.3 vs lib/navy/captain.rb in navy-1.0.0

- old
+ new

@@ -1,43 +1,40 @@ class Navy::Captain < Navy::Rank # This hash maps PIDs to Officers OFFICERS = {} + RESPAWNS = {} SELF_PIPE = [] # signal queue used for self-piping SIG_QUEUE = [] # list of signals we care about and trap in admiral. QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ] - attr_accessor :label, :captain_pid, :timeout, :number + attr_accessor :label, :captain_pid, :timeout, :officer_count, :officer_job, :respawn_limit, :respawn_limit_seconds attr_reader :admiral, :options - def initialize(admiral, label, options = {}) - @admiral, @options = admiral, options.dup - @label = label - @number = @options[:number] || 1 - @timeout = 15 - # self.pid = "/tmp/navy-#{label}.pid" - self.after_fork = ->(captain, officer) do - captain.logger.info("(#{captain.label}) officer=#{officer.number} spawned pid=#{$$}") - end - self.before_fork = ->(captain, officer) do - captain.logger.info("(#{captain.label}) officer=#{officer.number} spawning...") - end - self.before_exec = ->(captain) do - captain.logger.info("forked child re-executing...") - end + def initialize(admiral, label, config, options = {}) + @options = options.dup + @options[:use_defaults] = true + @options[:config_file] = config + self.orders = Navy::Captain::Orders.new(self.class, @options) + @options.merge!(orders.set) + + @admiral, @label = admiral, label + + orders.give!(self, except: [ :stderr_path, :stdout_path ]) end def ==(other_label) @label == other_label end def start + orders.give!(self, only: [ :stderr_path, :stdout_path ]) init_self_pipe! QUEUE_SIGS.each do |sig| trap(sig) do logger.debug "captain[#{label}] received #{sig}" if $DEBUG SIG_QUEUE << sig @@ -47,10 +44,11 @@ trap(:CHLD) { awaken_captain } logger.info "captain[#{label}] starting" self.captain_pid = $$ + preload.call(self) if preload spawn_missing_officers self end def join @@ -62,14 +60,15 @@ begin reap_all_officers case SIG_QUEUE.shift when nil + # logger.info "captain[#{label}] heartbeat" # avoid murdering workers after our master process (or the # machine) comes out of suspend/hibernation if (last_check + @timeout) >= (last_check = Time.now) - # sleep_time = murder_lazy_workers + sleep_time = murder_lazy_officers logger.debug("would normally murder lazy officers") if $DEBUG else sleep_time = @timeout/2.0 + 1 logger.debug("waiting #{sleep_time}s after suspend/hibernation") end @@ -94,13 +93,13 @@ # else logger.info "SIGWINCH ignored because we're not daemonized" # end when :TTIN respawn = true - self.number += 1 + self.officer_count += 1 when :TTOU - self.number -= 1 if self.number > 0 + self.officer_count -= 1 if self.officer_count > 0 when :HUP respawn = true # if config.config_file # load_config! # else # exec binary and exit if there's no config file @@ -156,18 +155,40 @@ rescue Errno::ECHILD break end while true end + # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File + def murder_lazy_officers + @timeout - 1 + end + def spawn_missing_officers n = -1 - until (n += 1) == @number + until (n += 1) == @officer_count OFFICERS.value?(n) and next - officer = Navy::Officer.new(self, n, options[:job]) + respawns = RESPAWNS[n] + if respawns + first_respawn = respawns.first + respawn_count = respawns.size + if respawn_count >= respawn_limit + if (diff = Time.now - first_respawn) < respawn_limit_seconds + logger.error "(#{label}) officer=#{n} respawn error (#{respawn_count} in #{diff} sec, limit #{respawn_limit} in #{respawn_limit_seconds} sec)" + @officer_count -= 1 + proc_name "captain[#{label}] (error)" + break + else + RESPAWNS[n] = [] + end + end + end + officer = Navy::Officer.new(self, n, officer_job) before_fork.call(self, officer) if before_fork if pid = fork OFFICERS[pid] = officer + RESPAWNS[n] ||= [] + RESPAWNS[n].push(Time.now) else after_fork.call(self, officer) if after_fork officer.start exit end @@ -177,14 +198,14 @@ logger.error(e) rescue nil exit! end def maintain_officer_count - (off = OFFICERS.size - @number) == 0 and return + (off = OFFICERS.size - @officer_count) == 0 and return off < 0 and return spawn_missing_officers OFFICERS.dup.each_pair { |opid,o| - o.number >= @number and kill_officer(:QUIT, opid) rescue nil + o.number >= @officer_count and kill_officer(:QUIT, opid) rescue nil } end # delivers a signal to a officer and fails gracefully if the officer # is no longer running. @@ -204,6 +225,7 @@ SELF_PIPE.each { |io| io.close rescue nil } SELF_PIPE.replace(Kgio::Pipe.new) SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } end -end +end +require 'navy/captain/orders' \ No newline at end of file