lib/pitchfork.rb in pitchfork-0.7.0 vs lib/pitchfork.rb in pitchfork-0.8.0

- old
+ new

@@ -34,160 +34,193 @@ BootFailure = Class.new(StandardError) # :stopdoc: - # This returns a lambda to pass in as the app, this does not "build" the - # app The returned lambda will be called when it is - # time to build the app. - def self.builder(ru, op) - # allow Configurator to parse cli switches embedded in the ru file - op = Pitchfork::Configurator::RACKUP.merge!(:file => ru, :optparse => op) - if ru =~ /\.ru$/ && !defined?(Rack::Builder) - abort "rack and Rack::Builder must be available for processing #{ru}" + FORK_LOCK = Monitor.new + @socket_type = :SOCK_SEQPACKET + + class << self + # :startdoc: + + # Prevent Pitchfork from forking new children for the duration of the block. + # + # If you have background threads calling code that synchronize native locks, + # while the GVL is released, forking while they are held could leak to + # corrupted children. + # + # One example of this is `getaddrinfo(3)`, so opening a connection from a + # background thread has a chance to produce stuck children. + # + # To avoid this you can wrap such code in `Pitchfork.prevent_fork`: + # + # def heartbeat_thread + # @heartbeat_thread ||= Thread.new do + # loop do + # Pitchfork.prevent_fork do + # heartbeat + # end + # sleep 10 + # end + # end + # end + # + def prevent_fork(&block) + FORK_LOCK.synchronize(&block) end - # always called after config file parsing, may be called after forking - lambda do |_, server| - inner_app = case ru - when /\.ru$/ - raw = File.read(ru) - raw.sub!(/^__END__\n.*/, '') - eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru) - else - require ru - Object.const_get(File.basename(ru, '.rb').capitalize) + # :stopdoc: + + # This returns a lambda to pass in as the app, this does not "build" the + # app The returned lambda will be called when it is + # time to build the app. + def builder(ru, op) + # allow Configurator to parse cli switches embedded in the ru file + op = Pitchfork::Configurator::RACKUP.merge!(:file => ru, :optparse => op) + if ru =~ /\.ru$/ && !defined?(Rack::Builder) + abort "rack and Rack::Builder must be available for processing #{ru}" end - Rack::Builder.new do - use(Rack::ContentLength) - use(Pitchfork::Chunked) - use(Rack::Lint) if ENV["RACK_ENV"] == "development" - use(Rack::TempfileReaper) - run inner_app - end.to_app + # always called after config file parsing, may be called after forking + lambda do |_, server| + inner_app = case ru + when /\.ru$/ + raw = File.read(ru) + raw.sub!(/^__END__\n.*/, '') + eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru) + else + require ru + Object.const_get(File.basename(ru, '.rb').capitalize) + end + + Rack::Builder.new do + use(Rack::ContentLength) + use(Pitchfork::Chunked) + use(Rack::Lint) if ENV["RACK_ENV"] == "development" + use(Rack::TempfileReaper) + run inner_app + end.to_app + end end - end - # returns an array of strings representing TCP listen socket addresses - # and Unix domain socket paths. This is useful for use with - # Raindrops::Middleware under Linux: https://yhbt.net/raindrops/ - def self.listener_names - Pitchfork::HttpServer::LISTENERS.map do |io| - Pitchfork::SocketHelper.sock_name(io) + # returns an array of strings representing TCP listen socket addresses + # and Unix domain socket paths. This is useful for use with + # Raindrops::Middleware under Linux: https://yhbt.net/raindrops/ + def listener_names + Pitchfork::HttpServer::LISTENERS.map do |io| + Pitchfork::SocketHelper.sock_name(io) + end end - end - def self.log_error(logger, prefix, exc) - message = exc.message - message = message.dump if /[[:cntrl:]]/ =~ message - logger.error "#{prefix}: #{message} (#{exc.class})" - exc.backtrace.each { |line| logger.error(line) } - end + def log_error(logger, prefix, exc) + message = exc.message + message = message.dump if /[[:cntrl:]]/ =~ message + logger.error "#{prefix}: #{message} (#{exc.class})" + exc.backtrace.each { |line| logger.error(line) } + end - F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/ + F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/ - def self.pipe # :nodoc: - IO.pipe.each do |io| - # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft - # limits. - if defined?(F_SETPIPE_SZ) - begin - io.fcntl(F_SETPIPE_SZ, Raindrops::PAGE_SIZE) - rescue Errno::EINVAL - # old kernel - rescue Errno::EPERM - # resizes fail if Linux is close to the pipe limit for the user - # or if the user does not have permissions to resize + def pipe # :nodoc: + IO.pipe.each do |io| + # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft + # limits. + if defined?(F_SETPIPE_SZ) + begin + io.fcntl(F_SETPIPE_SZ, Raindrops::PAGE_SIZE) + rescue Errno::EINVAL + # old kernel + rescue Errno::EPERM + # resizes fail if Linux is close to the pipe limit for the user + # or if the user does not have permissions to resize + end end end end - end - @socket_type = :SOCK_SEQPACKET - def self.socketpair - pair = UNIXSocket.socketpair(@socket_type).map { |s| MessageSocket.new(s) } - pair[0].close_write - pair[1].close_read - pair - rescue Errno::EPROTONOSUPPORT - if @socket_type == :SOCK_SEQPACKET - # macOS and very old linuxes don't support SOCK_SEQPACKET (SCTP). - # In such case we can fallback to SOCK_STREAM (TCP) - warn("SEQPACKET (SCTP) isn't supported, falling back to STREAM") - @socket_type = :SOCK_STREAM - retry - else - raise + def socketpair + pair = UNIXSocket.socketpair(@socket_type).map { |s| MessageSocket.new(s) } + pair[0].close_write + pair[1].close_read + pair + rescue Errno::EPROTONOSUPPORT + if @socket_type == :SOCK_SEQPACKET + # macOS and very old linuxes don't support SOCK_SEQPACKET (SCTP). + # In such case we can fallback to SOCK_STREAM (TCP) + warn("SEQPACKET (SCTP) isn't supported, falling back to STREAM") + @socket_type = :SOCK_STREAM + retry + else + raise + end end - end - def self.clean_fork(setpgid: true, &block) - if pid = Process.fork - if setpgid - Process.setpgid(pid, pid) # Make into a group leader + def clean_fork(setpgid: true, &block) + if pid = FORK_LOCK.synchronize { Process.fork } + if setpgid + Process.setpgid(pid, pid) # Make into a group leader + end + return pid end - return pid - end - begin - # Pitchfork recursively refork the worker processes. - # Because of this we need to unwind the stack before resuming execution - # in the child, otherwise on each generation the available stack space would - # get smaller and smaller until it's basically 0. - # - # The very first version of this method used to call fork from a new - # thread, however this can cause issues with some native gems that rely on - # pthread_atfork(3) or pthread_mutex_lock(3), as the new main thread would - # now be different. - # - # A second version used to fork from a new fiber, but fibers have a much smaller - # stack space (https://bugs.ruby-lang.org/issues/3187), so it would break large applications. - # - # The latest version now use `throw` to unwind the stack after the fork, it however - # restrict it to be called only inside `handle_clean_fork`. - if Thread.current[:pitchfork_handle_clean_fork] - throw self, block - else - while block - block = catch(self) do - Thread.current[:pitchfork_handle_clean_fork] = true - block.call - nil + begin + # Pitchfork recursively refork the worker processes. + # Because of this we need to unwind the stack before resuming execution + # in the child, otherwise on each generation the available stack space would + # get smaller and smaller until it's basically 0. + # + # The very first version of this method used to call fork from a new + # thread, however this can cause issues with some native gems that rely on + # pthread_atfork(3) or pthread_mutex_lock(3), as the new main thread would + # now be different. + # + # A second version used to fork from a new fiber, but fibers have a much smaller + # stack space (https://bugs.ruby-lang.org/issues/3187), so it would break large applications. + # + # The latest version now use `throw` to unwind the stack after the fork, it however + # restrict it to be called only inside `handle_clean_fork`. + if Thread.current[:pitchfork_handle_clean_fork] + throw self, block + else + while block + block = catch(self) do + Thread.current[:pitchfork_handle_clean_fork] = true + block.call + nil + end end end + rescue + abort + else + exit end - rescue - abort - else - exit end - end - def self.fork_sibling(&block) - if REFORKING_AVAILABLE - # We double fork so that the new worker is re-attached back - # to the master. - # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4 - # or the master to be PID 1. - if middle_pid = Process.fork # parent - # We need to wait(2) so that the middle process doesn't end up a zombie. - Process.wait(middle_pid) - else # first child - clean_fork(&block) # detach into a grand child - exit + def fork_sibling(&block) + if REFORKING_AVAILABLE + # We double fork so that the new worker is re-attached back + # to the master. + # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4 + # or the master to be PID 1. + if middle_pid = FORK_LOCK.synchronize { Process.fork } # parent + # We need to wait(2) so that the middle process doesn't end up a zombie. + Process.wait(middle_pid) + else # first child + clean_fork(&block) # detach into a grand child + exit + end + else + clean_fork(&block) end - else - clean_fork(&block) + + nil # it's tricky to return the PID end - nil # it's tricky to return the PID + def time_now(int = false) + Process.clock_gettime(Process::CLOCK_MONOTONIC, int ? :second : :float_second) + end end - - def self.time_now(int = false) - Process.clock_gettime(Process::CLOCK_MONOTONIC, int ? :second : :float_second) - end - # :startdoc: end # :enddoc: require 'pitchfork/pitchfork_http'