lib/pitchfork/http_server.rb in pitchfork-0.14.0 vs lib/pitchfork/http_server.rb in pitchfork-0.15.0

- old
+ new

@@ -1,9 +1,10 @@ # -*- encoding: binary -*- # frozen_string_literal: true require 'pitchfork/pitchfork_http' +require 'pitchfork/listeners' require 'pitchfork/flock' require 'pitchfork/soft_timeout' require 'pitchfork/shared_memory' require 'pitchfork/info' @@ -77,23 +78,22 @@ # :stopdoc: attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes, :before_fork, :after_worker_fork, :after_mold_fork, :before_service_worker_ready, :before_service_worker_exit, :listener_opts, :children, - :orig_app, :config, :ready_pipe, - :default_middleware, :early_hints + :orig_app, :config, :ready_pipe, :early_hints attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready attr_reader :logger include Pitchfork::SocketHelper include Pitchfork::HttpResponse # all bound listener sockets # note: this is public used by raindrops, but not recommended for use # in new projects - LISTENERS = [] + LISTENERS = Listeners.new NOOP = '.' # Creates a working server on host:port (strange things happen if # port isn't a Number). Use HttpServer::run to start the server and @@ -194,32 +194,24 @@ end # replaces current listener set with +listeners+. This will # close the socket if it will not exist in the new listener set def listeners=(listeners) + unless LISTENERS.empty? + raise "Listeners can only be initialized once" + end + cur_names, dead_names = [], [] listener_names.each do |name| if name.start_with?('/') # mark unlinked sockets as dead so we can rebind them (File.socket?(name) ? cur_names : dead_names) << name else cur_names << name end end - set_names = listener_names(listeners) - dead_names.concat(cur_names - set_names).uniq! - - LISTENERS.delete_if do |io| - if dead_names.include?(sock_name(io)) - (io.close rescue nil).nil? # true - else - set_server_sockopt(io, listener_opts[sock_name(io)]) - false - end - end - - (set_names - cur_names).each { |addr| listen(addr) } + listener_names(listeners).each { |addr| listen(addr) } end def logger=(obj) Pitchfork::HttpParser::DEFAULTS["rack.logger"] = @logger = obj end @@ -231,25 +223,29 @@ # to retry, and +:delay+ may be specified as the time in seconds # to delay between retries. # A negative value for +:tries+ indicates the listen will be # retried indefinitely, this is useful when workers belonging to # different masters are spawned during a transparent upgrade. - def listen(address, opt = {}.merge(listener_opts[address] || {})) + def listen(address, opt = listener_opts[address] || {}) address = config.expand_addr(address) return if String === address && listener_names.include?(address) + opt = opt.dup delay = opt[:delay] || 0.5 tries = opt[:tries] || 5 + queues = opt[:queues] ||= 1 + opt[:reuseport] = true if queues > 1 + begin io = bind_listen(address, opt) unless TCPServer === io || UNIXServer === io io.autoclose = false io = server_cast(io) end logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" Info.keep_io(io) - LISTENERS << io + LISTENERS << io unless queues > 1 io rescue Errno::EADDRINUSE => err logger.error "adding listener failed addr=#{address} (in use)" raise err if tries == 0 tries -= 1 @@ -259,10 +255,33 @@ retry rescue => err logger.fatal "error adding listener addr=#{address}" raise err end + + if queues > 1 + ios = [io] + + (queues - 1).times do + io = bind_listen(address, opt) + unless TCPServer === io || UNIXServer === io + io.autoclose = false + io = server_cast(io) + end + logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno} (SO_REUSEPORT)" + Info.keep_io(io) + ios << io + rescue => err + logger.fatal "error adding listener addr=#{address}" + raise err + end + + io = Listeners::Group.new(ios, queues_per_worker: opt[:queues_per_worker] || queues - 1) + LISTENERS << io + end + + io end # monitors children and receives signals forever # (or until a termination signal is sent). This handles signals # one-at-a-time time and we'll happily drop signals in case somebody @@ -369,11 +388,12 @@ def stop(graceful = true) proc_name role: 'monitor', status: 'shutting down' @respawn = false SharedMemory.shutting_down! wait_for_pending_workers - self.listeners = [] + LISTENERS.each(&:close).clear + limit = Pitchfork.time_now + timeout until @children.empty? || Pitchfork.time_now > limit if graceful @children.soft_kill_all(:TERM) else @@ -895,10 +915,10 @@ after_worker_fork.call(self, worker) # can drop perms and create listeners LISTENERS.each { |sock| sock.close_on_exec = true } @config = nil @listener_opts = @orig_app = nil - readers = LISTENERS.dup + readers = LISTENERS.for_worker(worker.nr) readers << worker trap(:QUIT) { nuke_listeners!(readers) } trap(:TERM) { nuke_listeners!(readers) } trap(:INT) { nuke_listeners!(readers); exit!(0) } readers