lib/pitchfork.rb in pitchfork-0.7.0 vs lib/pitchfork.rb in pitchfork-0.8.0
- old
+ new
@@ -34,160 +34,193 @@
BootFailure = Class.new(StandardError)
# :stopdoc:
- # This returns a lambda to pass in as the app, this does not "build" the
- # app The returned lambda will be called when it is
- # time to build the app.
- def self.builder(ru, op)
- # allow Configurator to parse cli switches embedded in the ru file
- op = Pitchfork::Configurator::RACKUP.merge!(:file => ru, :optparse => op)
- if ru =~ /\.ru$/ && !defined?(Rack::Builder)
- abort "rack and Rack::Builder must be available for processing #{ru}"
+ FORK_LOCK = Monitor.new
+ @socket_type = :SOCK_SEQPACKET
+
+ class << self
+ # :startdoc:
+
+ # Prevent Pitchfork from forking new children for the duration of the block.
+ #
+ # If you have background threads calling code that synchronize native locks,
+ # while the GVL is released, forking while they are held could leak to
+ # corrupted children.
+ #
+ # One example of this is `getaddrinfo(3)`, so opening a connection from a
+ # background thread has a chance to produce stuck children.
+ #
+ # To avoid this you can wrap such code in `Pitchfork.prevent_fork`:
+ #
+ # def heartbeat_thread
+ # @heartbeat_thread ||= Thread.new do
+ # loop do
+ # Pitchfork.prevent_fork do
+ # heartbeat
+ # end
+ # sleep 10
+ # end
+ # end
+ # end
+ #
+ def prevent_fork(&block)
+ FORK_LOCK.synchronize(&block)
end
- # always called after config file parsing, may be called after forking
- lambda do |_, server|
- inner_app = case ru
- when /\.ru$/
- raw = File.read(ru)
- raw.sub!(/^__END__\n.*/, '')
- eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru)
- else
- require ru
- Object.const_get(File.basename(ru, '.rb').capitalize)
+ # :stopdoc:
+
+ # This returns a lambda to pass in as the app, this does not "build" the
+ # app The returned lambda will be called when it is
+ # time to build the app.
+ def builder(ru, op)
+ # allow Configurator to parse cli switches embedded in the ru file
+ op = Pitchfork::Configurator::RACKUP.merge!(:file => ru, :optparse => op)
+ if ru =~ /\.ru$/ && !defined?(Rack::Builder)
+ abort "rack and Rack::Builder must be available for processing #{ru}"
end
- Rack::Builder.new do
- use(Rack::ContentLength)
- use(Pitchfork::Chunked)
- use(Rack::Lint) if ENV["RACK_ENV"] == "development"
- use(Rack::TempfileReaper)
- run inner_app
- end.to_app
+ # always called after config file parsing, may be called after forking
+ lambda do |_, server|
+ inner_app = case ru
+ when /\.ru$/
+ raw = File.read(ru)
+ raw.sub!(/^__END__\n.*/, '')
+ eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru)
+ else
+ require ru
+ Object.const_get(File.basename(ru, '.rb').capitalize)
+ end
+
+ Rack::Builder.new do
+ use(Rack::ContentLength)
+ use(Pitchfork::Chunked)
+ use(Rack::Lint) if ENV["RACK_ENV"] == "development"
+ use(Rack::TempfileReaper)
+ run inner_app
+ end.to_app
+ end
end
- end
- # returns an array of strings representing TCP listen socket addresses
- # and Unix domain socket paths. This is useful for use with
- # Raindrops::Middleware under Linux: https://yhbt.net/raindrops/
- def self.listener_names
- Pitchfork::HttpServer::LISTENERS.map do |io|
- Pitchfork::SocketHelper.sock_name(io)
+ # returns an array of strings representing TCP listen socket addresses
+ # and Unix domain socket paths. This is useful for use with
+ # Raindrops::Middleware under Linux: https://yhbt.net/raindrops/
+ def listener_names
+ Pitchfork::HttpServer::LISTENERS.map do |io|
+ Pitchfork::SocketHelper.sock_name(io)
+ end
end
- end
- def self.log_error(logger, prefix, exc)
- message = exc.message
- message = message.dump if /[[:cntrl:]]/ =~ message
- logger.error "#{prefix}: #{message} (#{exc.class})"
- exc.backtrace.each { |line| logger.error(line) }
- end
+ def log_error(logger, prefix, exc)
+ message = exc.message
+ message = message.dump if /[[:cntrl:]]/ =~ message
+ logger.error "#{prefix}: #{message} (#{exc.class})"
+ exc.backtrace.each { |line| logger.error(line) }
+ end
- F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/
+ F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/
- def self.pipe # :nodoc:
- IO.pipe.each do |io|
- # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft
- # limits.
- if defined?(F_SETPIPE_SZ)
- begin
- io.fcntl(F_SETPIPE_SZ, Raindrops::PAGE_SIZE)
- rescue Errno::EINVAL
- # old kernel
- rescue Errno::EPERM
- # resizes fail if Linux is close to the pipe limit for the user
- # or if the user does not have permissions to resize
+ def pipe # :nodoc:
+ IO.pipe.each do |io|
+ # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft
+ # limits.
+ if defined?(F_SETPIPE_SZ)
+ begin
+ io.fcntl(F_SETPIPE_SZ, Raindrops::PAGE_SIZE)
+ rescue Errno::EINVAL
+ # old kernel
+ rescue Errno::EPERM
+ # resizes fail if Linux is close to the pipe limit for the user
+ # or if the user does not have permissions to resize
+ end
end
end
end
- end
- @socket_type = :SOCK_SEQPACKET
- def self.socketpair
- pair = UNIXSocket.socketpair(@socket_type).map { |s| MessageSocket.new(s) }
- pair[0].close_write
- pair[1].close_read
- pair
- rescue Errno::EPROTONOSUPPORT
- if @socket_type == :SOCK_SEQPACKET
- # macOS and very old linuxes don't support SOCK_SEQPACKET (SCTP).
- # In such case we can fallback to SOCK_STREAM (TCP)
- warn("SEQPACKET (SCTP) isn't supported, falling back to STREAM")
- @socket_type = :SOCK_STREAM
- retry
- else
- raise
+ def socketpair
+ pair = UNIXSocket.socketpair(@socket_type).map { |s| MessageSocket.new(s) }
+ pair[0].close_write
+ pair[1].close_read
+ pair
+ rescue Errno::EPROTONOSUPPORT
+ if @socket_type == :SOCK_SEQPACKET
+ # macOS and very old linuxes don't support SOCK_SEQPACKET (SCTP).
+ # In such case we can fallback to SOCK_STREAM (TCP)
+ warn("SEQPACKET (SCTP) isn't supported, falling back to STREAM")
+ @socket_type = :SOCK_STREAM
+ retry
+ else
+ raise
+ end
end
- end
- def self.clean_fork(setpgid: true, &block)
- if pid = Process.fork
- if setpgid
- Process.setpgid(pid, pid) # Make into a group leader
+ def clean_fork(setpgid: true, &block)
+ if pid = FORK_LOCK.synchronize { Process.fork }
+ if setpgid
+ Process.setpgid(pid, pid) # Make into a group leader
+ end
+ return pid
end
- return pid
- end
- begin
- # Pitchfork recursively refork the worker processes.
- # Because of this we need to unwind the stack before resuming execution
- # in the child, otherwise on each generation the available stack space would
- # get smaller and smaller until it's basically 0.
- #
- # The very first version of this method used to call fork from a new
- # thread, however this can cause issues with some native gems that rely on
- # pthread_atfork(3) or pthread_mutex_lock(3), as the new main thread would
- # now be different.
- #
- # A second version used to fork from a new fiber, but fibers have a much smaller
- # stack space (https://bugs.ruby-lang.org/issues/3187), so it would break large applications.
- #
- # The latest version now use `throw` to unwind the stack after the fork, it however
- # restrict it to be called only inside `handle_clean_fork`.
- if Thread.current[:pitchfork_handle_clean_fork]
- throw self, block
- else
- while block
- block = catch(self) do
- Thread.current[:pitchfork_handle_clean_fork] = true
- block.call
- nil
+ begin
+ # Pitchfork recursively refork the worker processes.
+ # Because of this we need to unwind the stack before resuming execution
+ # in the child, otherwise on each generation the available stack space would
+ # get smaller and smaller until it's basically 0.
+ #
+ # The very first version of this method used to call fork from a new
+ # thread, however this can cause issues with some native gems that rely on
+ # pthread_atfork(3) or pthread_mutex_lock(3), as the new main thread would
+ # now be different.
+ #
+ # A second version used to fork from a new fiber, but fibers have a much smaller
+ # stack space (https://bugs.ruby-lang.org/issues/3187), so it would break large applications.
+ #
+ # The latest version now use `throw` to unwind the stack after the fork, it however
+ # restrict it to be called only inside `handle_clean_fork`.
+ if Thread.current[:pitchfork_handle_clean_fork]
+ throw self, block
+ else
+ while block
+ block = catch(self) do
+ Thread.current[:pitchfork_handle_clean_fork] = true
+ block.call
+ nil
+ end
end
end
+ rescue
+ abort
+ else
+ exit
end
- rescue
- abort
- else
- exit
end
- end
- def self.fork_sibling(&block)
- if REFORKING_AVAILABLE
- # We double fork so that the new worker is re-attached back
- # to the master.
- # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
- # or the master to be PID 1.
- if middle_pid = Process.fork # parent
- # We need to wait(2) so that the middle process doesn't end up a zombie.
- Process.wait(middle_pid)
- else # first child
- clean_fork(&block) # detach into a grand child
- exit
+ def fork_sibling(&block)
+ if REFORKING_AVAILABLE
+ # We double fork so that the new worker is re-attached back
+ # to the master.
+ # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
+ # or the master to be PID 1.
+ if middle_pid = FORK_LOCK.synchronize { Process.fork } # parent
+ # We need to wait(2) so that the middle process doesn't end up a zombie.
+ Process.wait(middle_pid)
+ else # first child
+ clean_fork(&block) # detach into a grand child
+ exit
+ end
+ else
+ clean_fork(&block)
end
- else
- clean_fork(&block)
+
+ nil # it's tricky to return the PID
end
- nil # it's tricky to return the PID
+ def time_now(int = false)
+ Process.clock_gettime(Process::CLOCK_MONOTONIC, int ? :second : :float_second)
+ end
end
-
- def self.time_now(int = false)
- Process.clock_gettime(Process::CLOCK_MONOTONIC, int ? :second : :float_second)
- end
- # :startdoc:
end
# :enddoc:
require 'pitchfork/pitchfork_http'