lib/httpx/selector.rb in httpx-0.7.0 vs lib/httpx/selector.rb in httpx-0.8.0
- old
+ new
@@ -1,135 +1,136 @@
# frozen_string_literal: true
+require "io/wait"
+
+module IOExtensions # :nodoc:
+ refine IO do
+ def wait(timeout = nil, mode = :read)
+ case mode
+ when :read
+ wait_readable(timeout)
+ when :write
+ wait_writable(timeout)
+ when :read_write
+ r, w = IO.select([self], [self], nil, timeout)
+
+ return unless r || w
+
+ self
+ end
+ end
+ end
+end
+
class HTTPX::Selector
READABLE = %i[rw r].freeze
WRITABLE = %i[rw w].freeze
private_constant :READABLE
private_constant :WRITABLE
- #
- # I/O monitor
- #
- class Monitor
- attr_accessor :io, :interests, :readiness
+ using IOExtensions unless IO.method_defined?(:wait) && IO.instance_method(:wait).arity == 2
- def initialize(io, interests, reactor)
- @io = io
- @interests = interests
- @reactor = reactor
- @closed = false
- end
+ def initialize
+ @selectables = []
+ end
- def readable?
- READABLE.include?(@interests)
- end
+ # deregisters +io+ from selectables.
+ def deregister(io)
+ @selectables.delete(io)
+ end
- def writable?
- WRITABLE.include?(@interests)
- end
+ # register +io+.
+ def register(io)
+ return if @selectables.include?(io)
- # closes +@io+, deregisters from reactor (unless +deregister+ is false)
- def close(deregister = true)
- return if @closed
+ @selectables << io
+ end
- @closed = true
- @reactor.deregister(@io) if deregister
- end
+ private
- def closed?
- @closed
- end
+ READ_INTERESTS = %i[r rw].freeze
+ WRITE_INTERESTS = %i[w rw].freeze
- # :nocov:
- def to_s
- "#<#{self.class}: #{@io}(closed:#{@closed}) #{@interests} #{object_id.to_s(16)}>"
- end
- # :nocov:
- end
+ def select_many(interval)
+ selectables, r, w = nil
- def initialize
- @selectables = {}
- @__r__, @__w__ = IO.pipe
- @closed = false
- end
+ # first, we group IOs based on interest type. On call to #interests however,
+ # things might already happen, and new IOs might be registered, so we might
+ # have to start all over again. We do this until we group all selectables
+ loop do
+ begin
+ r = nil
+ w = nil
- # deregisters +io+ from selectables.
- def deregister(io)
- monitor = @selectables.delete(io)
- monitor.close(false) if monitor
- end
+ selectables = @selectables
+ @selectables = []
- # register +io+ for +interests+ events.
- def register(io, interests)
- monitor = @selectables[io]
- if monitor
- monitor.interests = interests
- else
- monitor = Monitor.new(io, interests, self)
- @selectables[io] = monitor
- end
- monitor
- end
+ selectables.each do |io|
+ interests = io.interests
- # waits for read/write events for +interval+. Yields for monitors of
- # selected IO objects.
- #
- def select(interval)
- begin
- r = [@__r__]
- w = []
+ (r ||= []) << io if READ_INTERESTS.include?(interests)
+ (w ||= []) << io if WRITE_INTERESTS.include?(interests)
+ end
- @selectables.each do |io, monitor|
- r << io if monitor.interests == :r || monitor.interests == :rw
- w << io if monitor.interests == :w || monitor.interests == :rw
- monitor.readiness = nil
+ if @selectables.empty?
+ @selectables = selectables
+ break
+ else
+ @selectables = [*selectables, @selectables]
+ end
+ rescue StandardError
+ @selectables = selectables if selectables
+ raise
end
+ end
+ # TODO: what to do if there are no selectables?
+
+ begin
readers, writers = IO.select(r, w, nil, interval)
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
rescue IOError, SystemCallError
- @selectables.reject! { |io, _| io.closed? }
+ @selectables.reject!(&:closed?)
retry
end
readers.each do |io|
- if io == @__r__
- # clean up wakeups
- @__r__.read(@__r__.stat.size)
- else
- monitor = io.closed? ? @selectables.delete(io) : @selectables[io]
- next unless monitor
+ yield io
- monitor.readiness = writers.delete(io) ? :rw : :r
- yield monitor
- end
+ # so that we don't yield 2 times
+ writers.delete(io)
end if readers
writers.each do |io|
- monitor = io.closed? ? @selectables.delete(io) : @selectables[io]
- next unless monitor
-
- # don't double run this, the last iteration might have run this task already
- monitor.readiness = :w
- yield monitor
+ yield io
end if writers
end
- # Closes the selector.
- #
- def close
- return if @closed
+ def select_one(interval)
+ io = @selectables.first
- @__r__.close
- @__w__.close
- rescue IOError
- ensure
- @closed = true
+ interests = io.interests
+
+ result = case interests
+ when :r then io.to_io.wait_readable(interval)
+ when :w then io.to_io.wait_writable(interval)
+ when :rw then io.to_io.wait(interval, :read_write)
+ when nil then return
+ end
+
+ raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") unless result
+
+ yield io
+ rescue IOError, SystemCallError
+ @selectables.reject!(&:closed?)
end
- # interrupts the select call.
- def wakeup
- @__w__.write_nonblock("\0", exception: false)
+ def select(interval, &block)
+ return select_one(interval, &block) if @selectables.size == 1
+
+ select_many(interval, &block)
end
+
+ public :select
end