lib/navy/captain.rb in navy-0.0.3 vs lib/navy/captain.rb in navy-1.0.0
- old
+ new
@@ -1,43 +1,40 @@
class Navy::Captain < Navy::Rank
# This hash maps PIDs to Officers
OFFICERS = {}
+ RESPAWNS = {}
SELF_PIPE = []
# signal queue used for self-piping
SIG_QUEUE = []
# list of signals we care about and trap in admiral.
QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ]
- attr_accessor :label, :captain_pid, :timeout, :number
+ attr_accessor :label, :captain_pid, :timeout, :officer_count, :officer_job, :respawn_limit, :respawn_limit_seconds
attr_reader :admiral, :options
- def initialize(admiral, label, options = {})
- @admiral, @options = admiral, options.dup
- @label = label
- @number = @options[:number] || 1
- @timeout = 15
- # self.pid = "/tmp/navy-#{label}.pid"
- self.after_fork = ->(captain, officer) do
- captain.logger.info("(#{captain.label}) officer=#{officer.number} spawned pid=#{$$}")
- end
- self.before_fork = ->(captain, officer) do
- captain.logger.info("(#{captain.label}) officer=#{officer.number} spawning...")
- end
- self.before_exec = ->(captain) do
- captain.logger.info("forked child re-executing...")
- end
+ def initialize(admiral, label, config, options = {})
+ @options = options.dup
+ @options[:use_defaults] = true
+ @options[:config_file] = config
+ self.orders = Navy::Captain::Orders.new(self.class, @options)
+ @options.merge!(orders.set)
+
+ @admiral, @label = admiral, label
+
+ orders.give!(self, except: [ :stderr_path, :stdout_path ])
end
def ==(other_label)
@label == other_label
end
def start
+ orders.give!(self, only: [ :stderr_path, :stdout_path ])
init_self_pipe!
QUEUE_SIGS.each do |sig|
trap(sig) do
logger.debug "captain[#{label}] received #{sig}" if $DEBUG
SIG_QUEUE << sig
@@ -47,10 +44,11 @@
trap(:CHLD) { awaken_captain }
logger.info "captain[#{label}] starting"
self.captain_pid = $$
+ preload.call(self) if preload
spawn_missing_officers
self
end
def join
@@ -62,14 +60,15 @@
begin
reap_all_officers
case SIG_QUEUE.shift
when nil
+ # logger.info "captain[#{label}] heartbeat"
# avoid murdering workers after our master process (or the
# machine) comes out of suspend/hibernation
if (last_check + @timeout) >= (last_check = Time.now)
- # sleep_time = murder_lazy_workers
+ sleep_time = murder_lazy_officers
logger.debug("would normally murder lazy officers") if $DEBUG
else
sleep_time = @timeout/2.0 + 1
logger.debug("waiting #{sleep_time}s after suspend/hibernation")
end
@@ -94,13 +93,13 @@
# else
logger.info "SIGWINCH ignored because we're not daemonized"
# end
when :TTIN
respawn = true
- self.number += 1
+ self.officer_count += 1
when :TTOU
- self.number -= 1 if self.number > 0
+ self.officer_count -= 1 if self.officer_count > 0
when :HUP
respawn = true
# if config.config_file
# load_config!
# else # exec binary and exit if there's no config file
@@ -156,18 +155,40 @@
rescue Errno::ECHILD
break
end while true
end
+ # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File
+ def murder_lazy_officers
+ @timeout - 1
+ end
+
def spawn_missing_officers
n = -1
- until (n += 1) == @number
+ until (n += 1) == @officer_count
OFFICERS.value?(n) and next
- officer = Navy::Officer.new(self, n, options[:job])
+ respawns = RESPAWNS[n]
+ if respawns
+ first_respawn = respawns.first
+ respawn_count = respawns.size
+ if respawn_count >= respawn_limit
+ if (diff = Time.now - first_respawn) < respawn_limit_seconds
+ logger.error "(#{label}) officer=#{n} respawn error (#{respawn_count} in #{diff} sec, limit #{respawn_limit} in #{respawn_limit_seconds} sec)"
+ @officer_count -= 1
+ proc_name "captain[#{label}] (error)"
+ break
+ else
+ RESPAWNS[n] = []
+ end
+ end
+ end
+ officer = Navy::Officer.new(self, n, officer_job)
before_fork.call(self, officer) if before_fork
if pid = fork
OFFICERS[pid] = officer
+ RESPAWNS[n] ||= []
+ RESPAWNS[n].push(Time.now)
else
after_fork.call(self, officer) if after_fork
officer.start
exit
end
@@ -177,14 +198,14 @@
logger.error(e) rescue nil
exit!
end
def maintain_officer_count
- (off = OFFICERS.size - @number) == 0 and return
+ (off = OFFICERS.size - @officer_count) == 0 and return
off < 0 and return spawn_missing_officers
OFFICERS.dup.each_pair { |opid,o|
- o.number >= @number and kill_officer(:QUIT, opid) rescue nil
+ o.number >= @officer_count and kill_officer(:QUIT, opid) rescue nil
}
end
# delivers a signal to a officer and fails gracefully if the officer
# is no longer running.
@@ -204,6 +225,7 @@
SELF_PIPE.each { |io| io.close rescue nil }
SELF_PIPE.replace(Kgio::Pipe.new)
SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end
-end
+end
+require 'navy/captain/orders'
\ No newline at end of file