lib/puma/reactor.rb in piesync-puma-3.12.6.1 vs lib/puma/reactor.rb in piesync-puma-5.4.0.1
- old
+ new
@@ -1,347 +1,116 @@
# frozen_string_literal: true
-require 'puma/util'
-require 'puma/minissl'
+require 'puma/queue_close' unless ::Queue.instance_methods.include? :close
module Puma
- # Internal Docs, Not a public interface.
+ class UnsupportedBackend < StandardError; end
+
+ # Monitors a collection of IO objects, calling a block whenever
+ # any monitored object either receives data or times out, or when the Reactor shuts down.
#
- # The Reactor object is responsible for ensuring that a request has been
- # completely received before it starts to be processed. This may be known as read buffering.
- # If read buffering is not done, and no other read buffering is performed (such as by an application server
- # such as nginx) then the application would be subject to a slow client attack.
+ # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev,
+ # Java NIO or just plain IO#select). The call to `NIO::Selector#select` will
+ # 'wakeup' any IO object that receives data.
#
- # Each Puma "worker" process has its own Reactor. For example if you start puma with `$ puma -w 5` then
- # it will have 5 workers and each worker will have it's own reactor.
+ # This class additionally tracks a timeout for every added object,
+ # and wakes up any object when its timeout elapses.
#
- # For a graphical representation of how the reactor works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
- #
- # ## Reactor Flow
- #
- # A request comes into a `Puma::Server` instance, it is then passed to a `Puma::Reactor` instance.
- # The reactor stores the request in an array and calls `IO.select` on the array in a loop.
- #
- # When the request is written to by the client then the `IO.select` will "wake up" and
- # return the references to any objects that caused it to "wake". The reactor
- # then loops through each of these request objects, and sees if they're complete. If they
- # have a full header and body then the reactor passes the request to a thread pool.
- # Once in a thread pool, a "worker thread" can run the the application's Ruby code against the request.
- #
- # If the request is not complete, then it stays in the array, and the next time any
- # data is written to that socket reference, then the loop is woken up and it is checked for completeness again.
- #
- # A detailed example is given in the docs for `run_internal` which is where the bulk
- # of this logic lives.
+ # The implementation uses a Queue to synchronize adding new objects from the internal select loop.
class Reactor
- DefaultSleepFor = 5
-
- # Creates an instance of Puma::Reactor
- #
- # The `server` argument is an instance of `Puma::Server`
- # this is used to write a response for "low level errors"
- # when there is an exception inside of the reactor.
- #
- # The `app_pool` is an instance of `Puma::ThreadPool`.
- # Once a request is fully formed (header and body are received)
- # it will be passed to the `app_pool`.
- def initialize(server, app_pool)
- @server = server
- @events = server.events
- @app_pool = app_pool
-
- @mutex = Mutex.new
-
- # Read / Write pipes to wake up internal while loop
- @ready, @trigger = Puma::Util.pipe
- @input = []
- @sleep_for = DefaultSleepFor
+ # Create a new Reactor to monitor IO objects added by #add.
+ # The provided block will be invoked when an IO has data available to read,
+ # its timeout elapses, or when the Reactor shuts down.
+ def initialize(backend, &block)
+ require 'nio'
+ unless backend == :auto || NIO::Selector.backends.include?(backend)
+ raise "unsupported IO selector backend: #{backend} (available backends: #{NIO::Selector.backends.join(', ')})"
+ end
+ @selector = backend == :auto ? NIO::Selector.new : NIO::Selector.new(backend)
+ @input = Queue.new
@timeouts = []
-
- @sockets = [@ready]
+ @block = block
end
- private
-
-
- # Until a request is added via the `add` method this method will internally
- # loop, waiting on the `sockets` array objects. The only object in this
- # array at first is the `@ready` IO object, which is the read end of a pipe
- # connected to `@trigger` object. When `@trigger` is written to, then the loop
- # will break on `IO.select` and return an array.
- #
- # ## When a request is added:
- #
- # When the `add` method is called, an instance of `Puma::Client` is added to the `@input` array.
- # Next the `@ready` pipe is "woken" by writing a string of `"*"` to `@trigger`.
- #
- # When that happens, the internal loop stops blocking at `IO.select` and returns a reference
- # to whatever "woke" it up. On the very first loop, the only thing in `sockets` is `@ready`.
- # When `@trigger` is written-to, the loop "wakes" and the `ready`
- # variable returns an array of arrays that looks like `[[#<IO:fd 10>], [], []]` where the
- # first IO object is the `@ready` object. This first array `[#<IO:fd 10>]`
- # is saved as a `reads` variable.
- #
- # The `reads` variable is iterated through. In the case that the object
- # is the same as the `@ready` input pipe, then we know that there was a `trigger` event.
- #
- # If there was a trigger event, then one byte of `@ready` is read into memory. In the case of the first request,
- # the reactor sees that it's a `"*"` value and the reactor adds the contents of `@input` into the `sockets` array.
- # The while then loop continues to iterate again, but now the `sockets` array contains a `Puma::Client` instance in addition
- # to the `@ready` IO object. For example: `[#<IO:fd 10>, #<Puma::Client:0x3fdc1103bee8 @ready=false>]`.
- #
- # Since the `Puma::Client` in this example has data that has not been read yet,
- # the `IO.select` is immediately able to "wake" and read from the `Puma::Client`. At this point the
- # `ready` output looks like this: `[[#<Puma::Client:0x3fdc1103bee8 @ready=false>], [], []]`.
- #
- # Each element in the first entry is iterated over. The `Puma::Client` object is not
- # the `@ready` pipe, so the reactor checks to see if it has the fully header and body with
- # the `Puma::Client#try_to_finish` method. If the full request has been sent,
- # then the request is passed off to the `@app_pool` thread pool so that a "worker thread"
- # can pick up the request and begin to execute application logic. This is done
- # via `@app_pool << c`. The `Puma::Client` is then removed from the `sockets` array.
- #
- # If the request body is not present then nothing will happen, and the loop will iterate
- # again. When the client sends more data to the socket the `Puma::Client` object will
- # wake up the `IO.select` and it can again be checked to see if it's ready to be
- # passed to the thread pool.
- #
- # ## Time Out Case
- #
- # In addition to being woken via a write to one of the sockets the `IO.select` will
- # periodically "time out" of the sleep. One of the functions of this is to check for
- # any requests that have "timed out". At the end of the loop it's checked to see if
- # the first element in the `@timeout` array has exceed it's allowed time. If so,
- # the client object is removed from the timeout aray, a 408 response is written.
- # Then it's connection is closed, and the object is removed from the `sockets` array
- # that watches for new data.
- #
- # This behavior loops until all the objects that have timed out have been removed.
- #
- # Once all the timeouts have been processed, the next duration of the `IO.select` sleep
- # will be set to be equal to the amount of time it will take for the next timeout to occur.
- # This calculation happens in `calculate_sleep`.
- def run_internal
- sockets = @sockets
-
- while true
- begin
- ready = IO.select sockets, nil, nil, @sleep_for
- rescue IOError => e
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- if sockets.any? { |socket| socket.closed? }
- STDERR.puts "Error in select: #{e.message} (#{e.class})"
- STDERR.puts e.backtrace
- sockets = sockets.reject { |socket| socket.closed? }
- retry
- else
- raise
- end
+ # Run the internal select loop, using a background thread by default.
+ def run(background=true)
+ if background
+ @thread = Thread.new do
+ Puma.set_thread_name "reactor"
+ select_loop
end
-
- if ready and reads = ready[0]
- reads.each do |c|
- if c == @ready
- @mutex.synchronize do
- case @ready.read(1)
- when "*"
- sockets += @input
- @input.clear
- when "c"
- sockets.delete_if do |s|
- if s == @ready
- false
- else
- s.close
- true
- end
- end
- when "!"
- return
- end
- end
- else
- # We have to be sure to remove it from the timeout
- # list or we'll accidentally close the socket when
- # it's in use!
- if c.timeout_at
- @mutex.synchronize do
- @timeouts.delete c
- end
- end
-
- begin
- if c.try_to_finish
- @app_pool << c
- sockets.delete c
- end
-
- # Don't report these to the lowlevel_error handler, otherwise
- # will be flooding them with errors when persistent connections
- # are closed.
- rescue ConnectionError
- c.write_500
- c.close
-
- sockets.delete c
-
- # SSL handshake failure
- rescue MiniSSL::SSLError => e
- @server.lowlevel_error(e, c.env)
-
- ssl_socket = c.io
- addr = ssl_socket.peeraddr.last
- cert = ssl_socket.peercert
-
- c.close
- sockets.delete c
-
- @events.ssl_error @server, addr, cert, e
-
- # The client doesn't know HTTP well
- rescue HttpParserError => e
- @server.lowlevel_error(e, c.env)
-
- c.write_400
- c.close
-
- sockets.delete c
-
- @events.parse_error @server, c.env, e
- rescue StandardError => e
- @server.lowlevel_error(e, c.env)
-
- c.write_500
- c.close
-
- sockets.delete c
- end
- end
- end
- end
-
- unless @timeouts.empty?
- @mutex.synchronize do
- now = Time.now
-
- while @timeouts.first.timeout_at < now
- c = @timeouts.shift
- c.write_408 if c.in_data_phase
- c.close
- sockets.delete c
-
- break if @timeouts.empty?
- end
-
- calculate_sleep
- end
- end
+ else
+ select_loop
end
end
- public
-
- def run
- run_internal
- ensure
- @trigger.close
- @ready.close
+ # Add a new client to monitor.
+ # The object must respond to #timeout and #timeout_at.
+ # Returns false if the reactor is already shut down.
+ def add(client)
+ @input << client
+ @selector.wakeup
+ true
+ rescue ClosedQueueError
+ false
end
- def run_in_thread
- @thread = Thread.new do
- begin
- run_internal
- rescue StandardError => e
- STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
- STDERR.puts e.backtrace
- retry
- ensure
- @trigger.close
- @ready.close
- end
+ # Shutdown the reactor, blocking until the background thread is finished.
+ def shutdown
+ @input.close
+ begin
+ @selector.wakeup
+ rescue IOError # Ignore if selector is already closed
end
+ @thread.join if @thread
end
- # The `calculate_sleep` sets the value that the `IO.select` will
- # sleep for in the main reactor loop when no sockets are being written to.
- #
- # The values kept in `@timeouts` are sorted so that the first timeout
- # comes first in the array. When there are no timeouts the default timeout is used.
- #
- # Otherwise a sleep value is set that is the same as the amount of time it
- # would take for the first element to time out.
- #
- # If that value is in the past, then a sleep value of zero is used.
- def calculate_sleep
- if @timeouts.empty?
- @sleep_for = DefaultSleepFor
- else
- diff = @timeouts.first.timeout_at.to_f - Time.now.to_f
+ private
- if diff < 0.0
- @sleep_for = 0
- else
- @sleep_for = diff
- end
- end
- end
+ def select_loop
+ begin
+ until @input.closed? && @input.empty?
+ # Wakeup any registered object that receives incoming data.
+ # Block until the earliest timeout or Selector#wakeup is called.
+ timeout = (earliest = @timeouts.first) && earliest.timeout
+ @selector.select(timeout) {|mon| wakeup!(mon.value)}
- # This method adds a connection to the reactor
- #
- # Typically called by `Puma::Server` the value passed in
- # is usually a `Puma::Client` object that responds like an IO
- # object.
- #
- # The main body of the reactor loop is in `run_internal` and it
- # will sleep on `IO.select`. When a new connection is added to the
- # reactor it cannot be added directly to the `sockets` aray, because
- # the `IO.select` will not be watching for it yet.
- #
- # Instead what needs to happen is that `IO.select` needs to be woken up,
- # the contents of `@input` added to the `sockets` array, and then
- # another call to `IO.select` needs to happen. Since the `Puma::Client`
- # object can be read immediately, it does not block, but instead returns
- # right away.
- #
- # This behavior is accomplished by writing to `@trigger` which wakes up
- # the `IO.select` and then there is logic to detect the value of `*`,
- # pull the contents from `@input` and add them to the sockets array.
- #
- # If the object passed in has a timeout value in `timeout_at` then
- # it is added to a `@timeouts` array. This array is then re-arranged
- # so that the first element to timeout will be at the front of the
- # array. Then a value to sleep for is derived in the call to `calculate_sleep`
- def add(c)
- @mutex.synchronize do
- @input << c
- @trigger << "*"
+ # Wakeup all objects that timed out.
+ timed_out = @timeouts.take_while {|t| t.timeout == 0}
+ timed_out.each(&method(:wakeup!))
- if c.timeout_at
- @timeouts << c
- @timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at }
-
- calculate_sleep
+ unless @input.empty?
+ until @input.empty?
+ client = @input.pop
+ register(client) if client.io_ok?
+ end
+ @timeouts.sort_by!(&:timeout_at)
+ end
end
+ rescue StandardError => e
+ STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
+ STDERR.puts e.backtrace
+ retry
end
+ # Wakeup all remaining objects on shutdown.
+ @timeouts.each(&@block)
+ @selector.close
end
- # Close all watched sockets and clear them from being watched
- def clear!
- begin
- @trigger << "c"
- rescue IOError
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- end
+ # Start monitoring the object.
+ def register(client)
+ @selector.register(client.to_io, :r).value = client
+ @timeouts << client
+ rescue ArgumentError
+ # unreadable clients raise error when processed by NIO
end
- def shutdown
- begin
- @trigger << "!"
- rescue IOError
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
+ # 'Wake up' a monitored object by calling the provided block.
+ # Stop monitoring the object if the block returns `true`.
+ def wakeup!(client)
+ if @block.call client
+ @selector.deregister client.to_io
+ @timeouts.delete client
end
-
- @thread.join
end
end
end