lib/einhorn/command.rb in einhorn-0.8.2 vs lib/einhorn/command.rb in einhorn-1.0.0
- old
+ new
@@ -1,39 +1,37 @@
-require 'pp'
-require 'set'
-require 'tmpdir'
+require "pp"
+require "set"
+require "tmpdir"
-require 'einhorn/command/interface'
-require 'einhorn/prctl'
+require "einhorn/command/interface"
+require "einhorn/prctl"
module Einhorn
module Command
def self.reap
- begin
- while true
- Einhorn.log_debug('Going to reap a child process')
- pid = Process.wait(-1, Process::WNOHANG)
- return unless pid
- cleanup(pid)
- Einhorn::Event.break_loop
- end
- rescue Errno::ECHILD
+ loop do
+ Einhorn.log_debug("Going to reap a child process")
+ pid = Process.wait(-1, Process::WNOHANG)
+ return unless pid
+ cleanup(pid)
+ Einhorn::Event.break_loop
end
+ rescue Errno::ECHILD
end
def self.cleanup(pid)
- unless spec = Einhorn::State.children[pid]
+ unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.")
return
end
Einhorn::State.children.delete(pid)
# Unacked worker
if spec[:type] == :worker && !spec[:acked]
Einhorn::State.consecutive_deaths_before_ack += 1
- extra = ' before it was ACKed'
+ extra = " before it was ACKed"
else
extra = nil
end
case type = spec[:type]
@@ -45,11 +43,11 @@
Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}", :upgrade)
end
end
def self.register_ping(pid, request_id)
- unless spec = Einhorn::State.children[pid]
+ unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
return
end
spec[:pinged_at] = Time.now
@@ -82,41 +80,39 @@
Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.")
register_ack(pid)
end
def self.register_ack(pid)
- unless spec = Einhorn::State.children[pid]
+ unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
return
end
if spec[:acked]
Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.")
return
end
- if Einhorn::State.consecutive_deaths_before_ack > 0
- extra = ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying"
- else
- extra = nil
+ extra = if Einhorn::State.consecutive_deaths_before_ack > 0
+ ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying"
end
Einhorn::State.consecutive_deaths_before_ack = 0
spec[:acked] = true
Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}")
# Could call cull here directly instead, I believe.
Einhorn::Event.break_loop
end
- def self.signal_all(signal, children=nil, record=true)
+ def self.signal_all(signal, children = nil, record = true)
children ||= Einhorn::WorkerPool.workers
signaled = {}
Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade)
children.each do |child|
- unless spec = Einhorn::State.children[child]
+ unless (spec = Einhorn::State.children[child])
Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.", :upgrade)
next
end
if record
@@ -148,43 +144,42 @@
next
end
Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.")
begin
- Process.kill('KILL', child)
+ Process.kill("KILL", child)
rescue Errno::ESRCH
end
- spec[:signaled].add('KILL')
+ spec[:signaled].add("KILL")
end
end
Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}")
end
end
-
def self.increment
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
new = (Einhorn::State.config[:number] += 1)
output = "Incrementing number of workers from #{old} -> #{new}"
- $stderr.puts(output)
+ warn(output)
output
end
def self.decrement
if Einhorn::State.config[:number] <= 1
output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn."
- $stderr.puts(output)
+ warn(output)
return output
end
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
new = (Einhorn::State.config[:number] -= 1)
output = "Decrementing number of workers from #{old} -> #{new}"
- $stderr.puts(output)
+ warn(output)
output
end
def self.set_workers(new)
if new == Einhorn::State.config[:number]
@@ -193,28 +188,28 @@
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
Einhorn::State.config[:number] = new
output = "Altering worker count, #{old} -> #{new}. Will "
- if old < new
- output << "spin up additional workers."
+ output << if old < new
+ "spin up additional workers."
else
- output << "gracefully terminate workers."
+ "gracefully terminate workers."
end
- $stderr.puts(output)
+ warn(output)
output
end
def self.dumpable_state
global_state = Einhorn::State.dumpable_state
descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor|
descriptor.to_state
end
{
- :state => global_state,
- :persistent_descriptors => descriptor_state,
+ state: global_state,
+ persistent_descriptors: descriptor_state
}
end
def self.reload
unless Einhorn::State.respawn
@@ -255,12 +250,12 @@
return
end
begin
Einhorn.initialize_reload_environment
- respawn_commandline = Einhorn.upgrade_commandline(['--with-state-fd', read.fileno.to_s])
- respawn_commandline << { :close_others => false }
+ respawn_commandline = Einhorn.upgrade_commandline(["--with-state-fd", read.fileno.to_s])
+ respawn_commandline << {close_others: false}
Einhorn.log_info("About to re-exec einhorn master as #{respawn_commandline.inspect}", :reload)
Einhorn::Compat.exec(*respawn_commandline)
rescue SystemCallError => e
Einhorn.log_error("Could not reload! Attempting to continue. Error was: #{e}", :reload)
Einhorn::State.reloading_for_upgrade = false
@@ -273,20 +268,20 @@
0.upto(all_indexes.length) do |i|
return i unless all_indexes.include?(i)
end
end
- def self.spinup(cmd=nil)
+ def self.spinup(cmd = nil)
cmd ||= Einhorn::State.cmd
index = next_index
expected_ppid = Process.pid
- if Einhorn::State.preloaded
- pid = fork do
+ pid = if Einhorn::State.preloaded
+ fork do
Einhorn::TransientState.whatami = :worker
prepare_child_process
- Einhorn.log_info('About to tear down Einhorn state and run einhorn_main')
+ Einhorn.log_info("About to tear down Einhorn state and run einhorn_main")
Einhorn::Command::Interface.uninit
Einhorn::Event.close_all_for_worker
Einhorn.set_argv(cmd, true)
reseed_random
@@ -295,11 +290,11 @@
prepare_child_environment(index)
einhorn_main
end
else
- pid = fork do
+ fork do
Einhorn::TransientState.whatami = :worker
prepare_child_process
Einhorn.log_info("About to exec #{cmd.inspect}")
Einhorn::Command::Interface.uninit
@@ -312,57 +307,52 @@
Einhorn::Event.close_all_for_worker
setup_parent_watch(expected_ppid)
prepare_child_environment(index)
- Einhorn::Compat.exec(cmd[0], cmd[1..-1], :close_others => false)
+ Einhorn::Compat.exec(cmd[0], cmd[1..-1], close_others: false)
end
end
Einhorn.log_info("===> Launched #{pid} (index: #{index})", :upgrade)
Einhorn::State.last_spinup = Time.now
Einhorn::State.children[pid] = {
- :type => :worker,
- :version => Einhorn::State.version,
- :acked => false,
- :signaled => Set.new,
- :last_signaled_at => nil,
- :index => index,
- :spinup_time => Einhorn::State.last_spinup,
+ type: :worker,
+ version: Einhorn::State.version,
+ acked: false,
+ signaled: Set.new,
+ last_signaled_at: nil,
+ index: index,
+ spinup_time: Einhorn::State.last_spinup
}
# Set up whatever's needed for ACKing
ack_mode = Einhorn::State.ack_mode
case type = ack_mode[:type]
when :timer
Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid)
when :manual
+ # nothing to do
else
Einhorn.log_error("Unrecognized ACK mode #{type.inspect}")
end
end
def self.prepare_child_environment(index)
# This is run from the child
- ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s
- ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path
+ ENV["EINHORN_MASTER_PID"] = Process.ppid.to_s
+ ENV["EINHORN_SOCK_PATH"] = Einhorn::Command::Interface.socket_path
if Einhorn::State.command_socket_as_fd
socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path)
Einhorn::TransientState.socket_handles << socket
- ENV['EINHORN_SOCK_FD'] = socket.fileno.to_s
+ ENV["EINHORN_SOCK_FD"] = socket.fileno.to_s
end
- ENV['EINHORN_FD_COUNT'] = Einhorn::State.bind_fds.length.to_s
- Einhorn::State.bind_fds.each_with_index {|fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s}
+ ENV["EINHORN_FD_COUNT"] = Einhorn::State.bind_fds.length.to_s
+ Einhorn::State.bind_fds.each_with_index { |fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s }
- ENV['EINHORN_CHILD_INDEX'] = index.to_s
-
- # EINHORN_FDS is deprecated. It was originally an attempt to
- # match Upstart's nominal internal support for space-separated
- # FD lists, but nobody uses that in practice, and it makes
- # finding individual FDs more difficult
- ENV['EINHORN_FDS'] = Einhorn::State.bind_fds.map(&:to_s).join(' ')
+ ENV["EINHORN_CHILD_INDEX"] = index.to_s
end
# Reseed common ruby random number generators.
#
# OpenSSL::Random uses the PID to reseed after fork, which means that if a
@@ -381,15 +371,15 @@
# reseed Kernel#rand
srand
# reseed OpenSSL::Random if it's loaded
if defined?(OpenSSL::Random)
- if defined?(Random)
- seed = Random.new_seed
+ seed = if defined?(Random)
+ Random.new_seed
else
# Ruby 1.8
- seed = rand
+ rand
end
OpenSSL::Random.seed(seed.to_s)
end
end
@@ -397,18 +387,18 @@
Process.setpgrp
Einhorn.renice_self
end
def self.setup_parent_watch(expected_ppid)
- if Einhorn::State.kill_children_on_exit then
+ if Einhorn::State.kill_children_on_exit
begin
# NB: Having the USR2 signal handler set to terminate (the default) at
# this point is required. If it's set to a ruby handler, there are
# race conditions that could cause the worker to leak.
Einhorn::Prctl.set_pdeathsig("USR2")
- if Process.ppid != expected_ppid then
+ if Process.ppid != expected_ppid
Einhorn.log_error("Parent process died before we set pdeathsig; cowardly refusing to exec child process.")
exit(1)
end
rescue NotImplementedError
# Unsupported OS; silently continue.
@@ -422,22 +412,23 @@
# fleet upgrade. In a smooth upgrade, bring up new workers and cull old
# workers one by one as soon as there is a replacement. In a fleet
# upgrade, bring up all the new workers and don't cull any old workers
# until they're all up.
#
- def self.full_upgrade(options={})
- options = {:smooth => false}.merge(options)
+ def self.full_upgrade(options = {})
+ options = {smooth: false}.merge(options)
Einhorn::State.smooth_upgrade = options.fetch(:smooth)
reload_for_upgrade
end
def self.full_upgrade_smooth
- full_upgrade(:smooth => true)
+ full_upgrade(smooth: true)
end
+
def self.full_upgrade_fleet
- full_upgrade(:smooth => false)
+ full_upgrade(smooth: false)
end
def self.reload_for_upgrade
Einhorn::State.reloading_for_upgrade = true
reload
@@ -446,12 +437,12 @@
def self.upgrade_workers
if Einhorn::State.upgrading
Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...", :upgrade)
else
Einhorn::State.upgrading = true
- u_type = Einhorn::State.smooth_upgrade ? 'smooth' : 'fleet'
- Einhorn.log_info("Starting #{u_type} upgrade from version" +
+ u_type = Einhorn::State.smooth_upgrade ? "smooth" : "fleet"
+ Einhorn.log_info("Starting #{u_type} upgrade from version" \
" #{Einhorn::State.version}...", :upgrade)
end
# Reset this, since we've just upgraded to a new universe (I'm
# not positive this is the right behavior, but it's not
@@ -494,28 +485,28 @@
Einhorn.log_debug("Not killing old workers, as excess is #{excess}.")
end
end
if unsignaled > target
- excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled-target)]
+ excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled - target)]
Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.")
signal_all("USR2", excess)
end
# Ensure all signaled workers that have outlived signal_timeout get killed.
kill_expired_signaled_workers if Einhorn::State.signal_timeout
end
def self.kill_expired_signaled_workers
now = Time.now
- children = Einhorn::State.children.select do |_,c|
+ children = Einhorn::State.children.select do |_, c|
# Only interested in USR2 signaled workers
next unless c[:signaled] && c[:signaled].length > 0
- next unless c[:signaled].include?('USR2')
+ next unless c[:signaled].include?("USR2")
# Ignore processes that have received KILL since it can't be trapped.
- next if c[:signaled].include?('KILL')
+ next if c[:signaled].include?("KILL")
# Filter out those children that have not reached signal_timeout yet.
next unless c[:last_signaled_at]
expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout
next unless now >= expires_at
@@ -525,16 +516,16 @@
Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0
children.each do |pid, child|
Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade)
begin
- Process.kill('KILL', pid)
+ Process.kill("KILL", pid)
rescue Errno::ESRCH
Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.")
end
- child[:signaled].add('KILL')
+ child[:signaled].add("KILL")
child[:last_signaled_at] = Time.now
end
end
def self.stop_respawning
@@ -557,20 +548,20 @@
if missing <= 0
Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.")
return
end
Einhorn.log_info("Launching #{missing} new workers")
- missing.times {spinup}
+ missing.times { spinup }
end
# Unbounded exponential backoff is not a thing: we run into problems if
# e.g., each of our hundred workers simultaneously fail to boot for the same
# ephemeral reason. Instead cap backoff by some reasonable maximum, so we
# don't wait until the heat death of the universe to spin up new capacity.
MAX_SPINUP_INTERVAL = 30.0
- def self.replenish_gradually(max_unacked=nil)
+ def self.replenish_gradually(max_unacked = nil)
return if Einhorn::TransientState.has_outstanding_spinup_timer
return unless Einhorn::WorkerPool.missing_worker_count > 0
max_unacked ||= Einhorn::State.config[:max_unacked]
@@ -589,11 +580,11 @@
raise ArgumentError.new("max_unacked must be positive")
end
# Exponentially backoff automated spinup if we're just having
# things die before ACKing
- spinup_interval = Einhorn::State.config[:seconds] * (1.5 ** Einhorn::State.consecutive_deaths_before_ack)
+ spinup_interval = Einhorn::State.config[:seconds] * (1.5**Einhorn::State.consecutive_deaths_before_ack)
spinup_interval = [spinup_interval, MAX_SPINUP_INTERVAL].min
seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f
if seconds_ago > spinup_interval
if trigger_spinup?(max_unacked)
@@ -616,17 +607,17 @@
Einhorn::TransientState.has_outstanding_spinup_timer = false
replenish
end
end
- def self.quieter(log=true)
+ def self.quieter(log = true)
Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2
output = "Verbosity set to #{Einhorn::State.verbosity}"
Einhorn.log_info(output) if log
output
end
- def self.louder(log=true)
+ def self.louder(log = true)
Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0
output = "Verbosity set to #{Einhorn::State.verbosity}"
Einhorn.log_info(output) if log
output
end