lib/pitchfork/http_server.rb in pitchfork-0.14.0 vs lib/pitchfork/http_server.rb in pitchfork-0.15.0
- old
+ new
@@ -1,9 +1,10 @@
# -*- encoding: binary -*-
# frozen_string_literal: true
require 'pitchfork/pitchfork_http'
+require 'pitchfork/listeners'
require 'pitchfork/flock'
require 'pitchfork/soft_timeout'
require 'pitchfork/shared_memory'
require 'pitchfork/info'
@@ -77,23 +78,22 @@
# :stopdoc:
attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes,
:before_fork, :after_worker_fork, :after_mold_fork, :before_service_worker_ready, :before_service_worker_exit,
:listener_opts, :children,
- :orig_app, :config, :ready_pipe,
- :default_middleware, :early_hints
+ :orig_app, :config, :ready_pipe, :early_hints
attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete,
:refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready
attr_reader :logger
include Pitchfork::SocketHelper
include Pitchfork::HttpResponse
# all bound listener sockets
# note: this is public used by raindrops, but not recommended for use
# in new projects
- LISTENERS = []
+ LISTENERS = Listeners.new
NOOP = '.'
# Creates a working server on host:port (strange things happen if
# port isn't a Number). Use HttpServer::run to start the server and
@@ -194,32 +194,24 @@
end
# replaces current listener set with +listeners+. This will
# close the socket if it will not exist in the new listener set
def listeners=(listeners)
+ unless LISTENERS.empty?
+ raise "Listeners can only be initialized once"
+ end
+
cur_names, dead_names = [], []
listener_names.each do |name|
if name.start_with?('/')
# mark unlinked sockets as dead so we can rebind them
(File.socket?(name) ? cur_names : dead_names) << name
else
cur_names << name
end
end
- set_names = listener_names(listeners)
- dead_names.concat(cur_names - set_names).uniq!
-
- LISTENERS.delete_if do |io|
- if dead_names.include?(sock_name(io))
- (io.close rescue nil).nil? # true
- else
- set_server_sockopt(io, listener_opts[sock_name(io)])
- false
- end
- end
-
- (set_names - cur_names).each { |addr| listen(addr) }
+ listener_names(listeners).each { |addr| listen(addr) }
end
def logger=(obj)
Pitchfork::HttpParser::DEFAULTS["rack.logger"] = @logger = obj
end
@@ -231,25 +223,29 @@
# to retry, and +:delay+ may be specified as the time in seconds
# to delay between retries.
# A negative value for +:tries+ indicates the listen will be
# retried indefinitely, this is useful when workers belonging to
# different masters are spawned during a transparent upgrade.
- def listen(address, opt = {}.merge(listener_opts[address] || {}))
+ def listen(address, opt = listener_opts[address] || {})
address = config.expand_addr(address)
return if String === address && listener_names.include?(address)
+ opt = opt.dup
delay = opt[:delay] || 0.5
tries = opt[:tries] || 5
+ queues = opt[:queues] ||= 1
+ opt[:reuseport] = true if queues > 1
+
begin
io = bind_listen(address, opt)
unless TCPServer === io || UNIXServer === io
io.autoclose = false
io = server_cast(io)
end
logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
Info.keep_io(io)
- LISTENERS << io
+ LISTENERS << io unless queues > 1
io
rescue Errno::EADDRINUSE => err
logger.error "adding listener failed addr=#{address} (in use)"
raise err if tries == 0
tries -= 1
@@ -259,10 +255,33 @@
retry
rescue => err
logger.fatal "error adding listener addr=#{address}"
raise err
end
+
+ if queues > 1
+ ios = [io]
+
+ (queues - 1).times do
+ io = bind_listen(address, opt)
+ unless TCPServer === io || UNIXServer === io
+ io.autoclose = false
+ io = server_cast(io)
+ end
+ logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno} (SO_REUSEPORT)"
+ Info.keep_io(io)
+ ios << io
+ rescue => err
+ logger.fatal "error adding listener addr=#{address}"
+ raise err
+ end
+
+ io = Listeners::Group.new(ios, queues_per_worker: opt[:queues_per_worker] || queues - 1)
+ LISTENERS << io
+ end
+
+ io
end
# monitors children and receives signals forever
# (or until a termination signal is sent). This handles signals
# one-at-a-time time and we'll happily drop signals in case somebody
@@ -369,11 +388,12 @@
def stop(graceful = true)
proc_name role: 'monitor', status: 'shutting down'
@respawn = false
SharedMemory.shutting_down!
wait_for_pending_workers
- self.listeners = []
+ LISTENERS.each(&:close).clear
+
limit = Pitchfork.time_now + timeout
until @children.empty? || Pitchfork.time_now > limit
if graceful
@children.soft_kill_all(:TERM)
else
@@ -895,10 +915,10 @@
after_worker_fork.call(self, worker) # can drop perms and create listeners
LISTENERS.each { |sock| sock.close_on_exec = true }
@config = nil
@listener_opts = @orig_app = nil
- readers = LISTENERS.dup
+ readers = LISTENERS.for_worker(worker.nr)
readers << worker
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
trap(:INT) { nuke_listeners!(readers); exit!(0) }
readers