lib/httpx/selector.rb in httpx-0.7.0 vs lib/httpx/selector.rb in httpx-0.8.0

- old
+ new

@@ -1,135 +1,136 @@ # frozen_string_literal: true +require "io/wait" + +module IOExtensions # :nodoc: + refine IO do + def wait(timeout = nil, mode = :read) + case mode + when :read + wait_readable(timeout) + when :write + wait_writable(timeout) + when :read_write + r, w = IO.select([self], [self], nil, timeout) + + return unless r || w + + self + end + end + end +end + class HTTPX::Selector READABLE = %i[rw r].freeze WRITABLE = %i[rw w].freeze private_constant :READABLE private_constant :WRITABLE - # - # I/O monitor - # - class Monitor - attr_accessor :io, :interests, :readiness + using IOExtensions unless IO.method_defined?(:wait) && IO.instance_method(:wait).arity == 2 - def initialize(io, interests, reactor) - @io = io - @interests = interests - @reactor = reactor - @closed = false - end + def initialize + @selectables = [] + end - def readable? - READABLE.include?(@interests) - end + # deregisters +io+ from selectables. + def deregister(io) + @selectables.delete(io) + end - def writable? - WRITABLE.include?(@interests) - end + # register +io+. + def register(io) + return if @selectables.include?(io) - # closes +@io+, deregisters from reactor (unless +deregister+ is false) - def close(deregister = true) - return if @closed + @selectables << io + end - @closed = true - @reactor.deregister(@io) if deregister - end + private - def closed? - @closed - end + READ_INTERESTS = %i[r rw].freeze + WRITE_INTERESTS = %i[w rw].freeze - # :nocov: - def to_s - "#<#{self.class}: #{@io}(closed:#{@closed}) #{@interests} #{object_id.to_s(16)}>" - end - # :nocov: - end + def select_many(interval) + selectables, r, w = nil - def initialize - @selectables = {} - @__r__, @__w__ = IO.pipe - @closed = false - end + # first, we group IOs based on interest type. On call to #interests however, + # things might already happen, and new IOs might be registered, so we might + # have to start all over again. We do this until we group all selectables + loop do + begin + r = nil + w = nil - # deregisters +io+ from selectables. - def deregister(io) - monitor = @selectables.delete(io) - monitor.close(false) if monitor - end + selectables = @selectables + @selectables = [] - # register +io+ for +interests+ events. - def register(io, interests) - monitor = @selectables[io] - if monitor - monitor.interests = interests - else - monitor = Monitor.new(io, interests, self) - @selectables[io] = monitor - end - monitor - end + selectables.each do |io| + interests = io.interests - # waits for read/write events for +interval+. Yields for monitors of - # selected IO objects. - # - def select(interval) - begin - r = [@__r__] - w = [] + (r ||= []) << io if READ_INTERESTS.include?(interests) + (w ||= []) << io if WRITE_INTERESTS.include?(interests) + end - @selectables.each do |io, monitor| - r << io if monitor.interests == :r || monitor.interests == :rw - w << io if monitor.interests == :w || monitor.interests == :rw - monitor.readiness = nil + if @selectables.empty? + @selectables = selectables + break + else + @selectables = [*selectables, @selectables] + end + rescue StandardError + @selectables = selectables if selectables + raise end + end + # TODO: what to do if there are no selectables? + + begin readers, writers = IO.select(r, w, nil, interval) raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil? rescue IOError, SystemCallError - @selectables.reject! { |io, _| io.closed? } + @selectables.reject!(&:closed?) retry end readers.each do |io| - if io == @__r__ - # clean up wakeups - @__r__.read(@__r__.stat.size) - else - monitor = io.closed? ? @selectables.delete(io) : @selectables[io] - next unless monitor + yield io - monitor.readiness = writers.delete(io) ? :rw : :r - yield monitor - end + # so that we don't yield 2 times + writers.delete(io) end if readers writers.each do |io| - monitor = io.closed? ? @selectables.delete(io) : @selectables[io] - next unless monitor - - # don't double run this, the last iteration might have run this task already - monitor.readiness = :w - yield monitor + yield io end if writers end - # Closes the selector. - # - def close - return if @closed + def select_one(interval) + io = @selectables.first - @__r__.close - @__w__.close - rescue IOError - ensure - @closed = true + interests = io.interests + + result = case interests + when :r then io.to_io.wait_readable(interval) + when :w then io.to_io.wait_writable(interval) + when :rw then io.to_io.wait(interval, :read_write) + when nil then return + end + + raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") unless result + + yield io + rescue IOError, SystemCallError + @selectables.reject!(&:closed?) end - # interrupts the select call. - def wakeup - @__w__.write_nonblock("\0", exception: false) + def select(interval, &block) + return select_one(interval, &block) if @selectables.size == 1 + + select_many(interval, &block) end + + public :select end