lib/einhorn/command.rb in einhorn-0.8.2 vs lib/einhorn/command.rb in einhorn-1.0.0

- old
+ new

@@ -1,39 +1,37 @@ -require 'pp' -require 'set' -require 'tmpdir' +require "pp" +require "set" +require "tmpdir" -require 'einhorn/command/interface' -require 'einhorn/prctl' +require "einhorn/command/interface" +require "einhorn/prctl" module Einhorn module Command def self.reap - begin - while true - Einhorn.log_debug('Going to reap a child process') - pid = Process.wait(-1, Process::WNOHANG) - return unless pid - cleanup(pid) - Einhorn::Event.break_loop - end - rescue Errno::ECHILD + loop do + Einhorn.log_debug("Going to reap a child process") + pid = Process.wait(-1, Process::WNOHANG) + return unless pid + cleanup(pid) + Einhorn::Event.break_loop end + rescue Errno::ECHILD end def self.cleanup(pid) - unless spec = Einhorn::State.children[pid] + unless (spec = Einhorn::State.children[pid]) Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.") return end Einhorn::State.children.delete(pid) # Unacked worker if spec[:type] == :worker && !spec[:acked] Einhorn::State.consecutive_deaths_before_ack += 1 - extra = ' before it was ACKed' + extra = " before it was ACKed" else extra = nil end case type = spec[:type] @@ -45,11 +43,11 @@ Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}", :upgrade) end end def self.register_ping(pid, request_id) - unless spec = Einhorn::State.children[pid] + unless (spec = Einhorn::State.children[pid]) Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.") return end spec[:pinged_at] = Time.now @@ -82,41 +80,39 @@ Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.") register_ack(pid) end def self.register_ack(pid) - unless spec = Einhorn::State.children[pid] + unless (spec = Einhorn::State.children[pid]) Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.") return end if spec[:acked] Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.") return end - if Einhorn::State.consecutive_deaths_before_ack > 0 - extra = ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying" - else - extra = nil + extra = if Einhorn::State.consecutive_deaths_before_ack > 0 + ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying" end Einhorn::State.consecutive_deaths_before_ack = 0 spec[:acked] = true Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}") # Could call cull here directly instead, I believe. Einhorn::Event.break_loop end - def self.signal_all(signal, children=nil, record=true) + def self.signal_all(signal, children = nil, record = true) children ||= Einhorn::WorkerPool.workers signaled = {} Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade) children.each do |child| - unless spec = Einhorn::State.children[child] + unless (spec = Einhorn::State.children[child]) Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.", :upgrade) next end if record @@ -148,43 +144,42 @@ next end Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.") begin - Process.kill('KILL', child) + Process.kill("KILL", child) rescue Errno::ESRCH end - spec[:signaled].add('KILL') + spec[:signaled].add("KILL") end end Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}") end end - def self.increment Einhorn::Event.break_loop old = Einhorn::State.config[:number] new = (Einhorn::State.config[:number] += 1) output = "Incrementing number of workers from #{old} -> #{new}" - $stderr.puts(output) + warn(output) output end def self.decrement if Einhorn::State.config[:number] <= 1 output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn." - $stderr.puts(output) + warn(output) return output end Einhorn::Event.break_loop old = Einhorn::State.config[:number] new = (Einhorn::State.config[:number] -= 1) output = "Decrementing number of workers from #{old} -> #{new}" - $stderr.puts(output) + warn(output) output end def self.set_workers(new) if new == Einhorn::State.config[:number] @@ -193,28 +188,28 @@ Einhorn::Event.break_loop old = Einhorn::State.config[:number] Einhorn::State.config[:number] = new output = "Altering worker count, #{old} -> #{new}. Will " - if old < new - output << "spin up additional workers." + output << if old < new + "spin up additional workers." else - output << "gracefully terminate workers." + "gracefully terminate workers." end - $stderr.puts(output) + warn(output) output end def self.dumpable_state global_state = Einhorn::State.dumpable_state descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor| descriptor.to_state end { - :state => global_state, - :persistent_descriptors => descriptor_state, + state: global_state, + persistent_descriptors: descriptor_state } end def self.reload unless Einhorn::State.respawn @@ -255,12 +250,12 @@ return end begin Einhorn.initialize_reload_environment - respawn_commandline = Einhorn.upgrade_commandline(['--with-state-fd', read.fileno.to_s]) - respawn_commandline << { :close_others => false } + respawn_commandline = Einhorn.upgrade_commandline(["--with-state-fd", read.fileno.to_s]) + respawn_commandline << {close_others: false} Einhorn.log_info("About to re-exec einhorn master as #{respawn_commandline.inspect}", :reload) Einhorn::Compat.exec(*respawn_commandline) rescue SystemCallError => e Einhorn.log_error("Could not reload! Attempting to continue. Error was: #{e}", :reload) Einhorn::State.reloading_for_upgrade = false @@ -273,20 +268,20 @@ 0.upto(all_indexes.length) do |i| return i unless all_indexes.include?(i) end end - def self.spinup(cmd=nil) + def self.spinup(cmd = nil) cmd ||= Einhorn::State.cmd index = next_index expected_ppid = Process.pid - if Einhorn::State.preloaded - pid = fork do + pid = if Einhorn::State.preloaded + fork do Einhorn::TransientState.whatami = :worker prepare_child_process - Einhorn.log_info('About to tear down Einhorn state and run einhorn_main') + Einhorn.log_info("About to tear down Einhorn state and run einhorn_main") Einhorn::Command::Interface.uninit Einhorn::Event.close_all_for_worker Einhorn.set_argv(cmd, true) reseed_random @@ -295,11 +290,11 @@ prepare_child_environment(index) einhorn_main end else - pid = fork do + fork do Einhorn::TransientState.whatami = :worker prepare_child_process Einhorn.log_info("About to exec #{cmd.inspect}") Einhorn::Command::Interface.uninit @@ -312,57 +307,52 @@ Einhorn::Event.close_all_for_worker setup_parent_watch(expected_ppid) prepare_child_environment(index) - Einhorn::Compat.exec(cmd[0], cmd[1..-1], :close_others => false) + Einhorn::Compat.exec(cmd[0], cmd[1..-1], close_others: false) end end Einhorn.log_info("===> Launched #{pid} (index: #{index})", :upgrade) Einhorn::State.last_spinup = Time.now Einhorn::State.children[pid] = { - :type => :worker, - :version => Einhorn::State.version, - :acked => false, - :signaled => Set.new, - :last_signaled_at => nil, - :index => index, - :spinup_time => Einhorn::State.last_spinup, + type: :worker, + version: Einhorn::State.version, + acked: false, + signaled: Set.new, + last_signaled_at: nil, + index: index, + spinup_time: Einhorn::State.last_spinup } # Set up whatever's needed for ACKing ack_mode = Einhorn::State.ack_mode case type = ack_mode[:type] when :timer Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid) when :manual + # nothing to do else Einhorn.log_error("Unrecognized ACK mode #{type.inspect}") end end def self.prepare_child_environment(index) # This is run from the child - ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s - ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path + ENV["EINHORN_MASTER_PID"] = Process.ppid.to_s + ENV["EINHORN_SOCK_PATH"] = Einhorn::Command::Interface.socket_path if Einhorn::State.command_socket_as_fd socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path) Einhorn::TransientState.socket_handles << socket - ENV['EINHORN_SOCK_FD'] = socket.fileno.to_s + ENV["EINHORN_SOCK_FD"] = socket.fileno.to_s end - ENV['EINHORN_FD_COUNT'] = Einhorn::State.bind_fds.length.to_s - Einhorn::State.bind_fds.each_with_index {|fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s} + ENV["EINHORN_FD_COUNT"] = Einhorn::State.bind_fds.length.to_s + Einhorn::State.bind_fds.each_with_index { |fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s } - ENV['EINHORN_CHILD_INDEX'] = index.to_s - - # EINHORN_FDS is deprecated. It was originally an attempt to - # match Upstart's nominal internal support for space-separated - # FD lists, but nobody uses that in practice, and it makes - # finding individual FDs more difficult - ENV['EINHORN_FDS'] = Einhorn::State.bind_fds.map(&:to_s).join(' ') + ENV["EINHORN_CHILD_INDEX"] = index.to_s end # Reseed common ruby random number generators. # # OpenSSL::Random uses the PID to reseed after fork, which means that if a @@ -381,15 +371,15 @@ # reseed Kernel#rand srand # reseed OpenSSL::Random if it's loaded if defined?(OpenSSL::Random) - if defined?(Random) - seed = Random.new_seed + seed = if defined?(Random) + Random.new_seed else # Ruby 1.8 - seed = rand + rand end OpenSSL::Random.seed(seed.to_s) end end @@ -397,18 +387,18 @@ Process.setpgrp Einhorn.renice_self end def self.setup_parent_watch(expected_ppid) - if Einhorn::State.kill_children_on_exit then + if Einhorn::State.kill_children_on_exit begin # NB: Having the USR2 signal handler set to terminate (the default) at # this point is required. If it's set to a ruby handler, there are # race conditions that could cause the worker to leak. Einhorn::Prctl.set_pdeathsig("USR2") - if Process.ppid != expected_ppid then + if Process.ppid != expected_ppid Einhorn.log_error("Parent process died before we set pdeathsig; cowardly refusing to exec child process.") exit(1) end rescue NotImplementedError # Unsupported OS; silently continue. @@ -422,22 +412,23 @@ # fleet upgrade. In a smooth upgrade, bring up new workers and cull old # workers one by one as soon as there is a replacement. In a fleet # upgrade, bring up all the new workers and don't cull any old workers # until they're all up. # - def self.full_upgrade(options={}) - options = {:smooth => false}.merge(options) + def self.full_upgrade(options = {}) + options = {smooth: false}.merge(options) Einhorn::State.smooth_upgrade = options.fetch(:smooth) reload_for_upgrade end def self.full_upgrade_smooth - full_upgrade(:smooth => true) + full_upgrade(smooth: true) end + def self.full_upgrade_fleet - full_upgrade(:smooth => false) + full_upgrade(smooth: false) end def self.reload_for_upgrade Einhorn::State.reloading_for_upgrade = true reload @@ -446,12 +437,12 @@ def self.upgrade_workers if Einhorn::State.upgrading Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...", :upgrade) else Einhorn::State.upgrading = true - u_type = Einhorn::State.smooth_upgrade ? 'smooth' : 'fleet' - Einhorn.log_info("Starting #{u_type} upgrade from version" + + u_type = Einhorn::State.smooth_upgrade ? "smooth" : "fleet" + Einhorn.log_info("Starting #{u_type} upgrade from version" \ " #{Einhorn::State.version}...", :upgrade) end # Reset this, since we've just upgraded to a new universe (I'm # not positive this is the right behavior, but it's not @@ -494,28 +485,28 @@ Einhorn.log_debug("Not killing old workers, as excess is #{excess}.") end end if unsignaled > target - excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled-target)] + excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled - target)] Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.") signal_all("USR2", excess) end # Ensure all signaled workers that have outlived signal_timeout get killed. kill_expired_signaled_workers if Einhorn::State.signal_timeout end def self.kill_expired_signaled_workers now = Time.now - children = Einhorn::State.children.select do |_,c| + children = Einhorn::State.children.select do |_, c| # Only interested in USR2 signaled workers next unless c[:signaled] && c[:signaled].length > 0 - next unless c[:signaled].include?('USR2') + next unless c[:signaled].include?("USR2") # Ignore processes that have received KILL since it can't be trapped. - next if c[:signaled].include?('KILL') + next if c[:signaled].include?("KILL") # Filter out those children that have not reached signal_timeout yet. next unless c[:last_signaled_at] expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout next unless now >= expires_at @@ -525,16 +516,16 @@ Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0 children.each do |pid, child| Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade) begin - Process.kill('KILL', pid) + Process.kill("KILL", pid) rescue Errno::ESRCH Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.") end - child[:signaled].add('KILL') + child[:signaled].add("KILL") child[:last_signaled_at] = Time.now end end def self.stop_respawning @@ -557,20 +548,20 @@ if missing <= 0 Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.") return end Einhorn.log_info("Launching #{missing} new workers") - missing.times {spinup} + missing.times { spinup } end # Unbounded exponential backoff is not a thing: we run into problems if # e.g., each of our hundred workers simultaneously fail to boot for the same # ephemeral reason. Instead cap backoff by some reasonable maximum, so we # don't wait until the heat death of the universe to spin up new capacity. MAX_SPINUP_INTERVAL = 30.0 - def self.replenish_gradually(max_unacked=nil) + def self.replenish_gradually(max_unacked = nil) return if Einhorn::TransientState.has_outstanding_spinup_timer return unless Einhorn::WorkerPool.missing_worker_count > 0 max_unacked ||= Einhorn::State.config[:max_unacked] @@ -589,11 +580,11 @@ raise ArgumentError.new("max_unacked must be positive") end # Exponentially backoff automated spinup if we're just having # things die before ACKing - spinup_interval = Einhorn::State.config[:seconds] * (1.5 ** Einhorn::State.consecutive_deaths_before_ack) + spinup_interval = Einhorn::State.config[:seconds] * (1.5**Einhorn::State.consecutive_deaths_before_ack) spinup_interval = [spinup_interval, MAX_SPINUP_INTERVAL].min seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f if seconds_ago > spinup_interval if trigger_spinup?(max_unacked) @@ -616,17 +607,17 @@ Einhorn::TransientState.has_outstanding_spinup_timer = false replenish end end - def self.quieter(log=true) + def self.quieter(log = true) Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2 output = "Verbosity set to #{Einhorn::State.verbosity}" Einhorn.log_info(output) if log output end - def self.louder(log=true) + def self.louder(log = true) Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0 output = "Verbosity set to #{Einhorn::State.verbosity}" Einhorn.log_info(output) if log output end