lib/httpx/channel.rb in httpx-0.1.0 vs lib/httpx/channel.rb in httpx-0.2.0

- old
+ new

@@ -48,41 +48,81 @@ when "https" then "ssl" else raise Error, "#{uri}: #{uri.scheme}: unrecognized channel" end end - io = IO.registry(type).new(uri, options) - new(io, options) + new(type, uri, options) end end def_delegator :@io, :closed? def_delegator :@write_buffer, :empty? - def initialize(io, options) - @io = io + attr_reader :uri, :state + + def initialize(type, uri, options) + @type = type + @uri = uri + @hostnames = [@uri.host] @options = Options.new(options) @window_size = @options.window_size @read_buffer = Buffer.new(BUFFER_SIZE) @write_buffer = Buffer.new(BUFFER_SIZE) @pending = [] @state = :idle on(:error) { |ex| on_error(ex) } end + def addresses=(addrs) + @io = IO.registry(@type).new(@uri, addrs, @options) + end + + def mergeable?(addresses) + return false if @state == :closing || !@io + !(@io.addresses & addresses).empty? + end + + # coalescable channels need to be mergeable! + # but internally, #mergeable? is called before #coalescable? + def coalescable?(channel) + if @io.protocol == "h2" && @uri.scheme == "https" + @io.verify_hostname(channel.uri.host) + else + @uri.host == channel.uri.host && + @uri.port == channel.uri.port && + @uri.scheme == channel.uri.scheme + end + end + + def merge(channel) + @hostnames += channel.instance_variable_get(:@hostnames) + pending = channel.instance_variable_get(:@pending) + pending.each do |req, args| + send(req, args) + end + end + + def unmerge(channel) + @hostnames -= channel.instance_variable_get(:@hostnames) + [@parser.pending, @pending].each do |pending| + pending.reject! do |request| + request.uri == channel.uri && begin + request.transition(:idle) + channel.send(request) + true + end + end + end + end + def match?(uri) return false if @state == :closing - ips = begin - Resolv.getaddresses(uri.host) - rescue StandardError - [uri.host] - end - ips.include?(@io.ip) && - uri.port == @io.port && - uri.scheme == @io.scheme + @hostnames.include?(uri.host) && + uri.port == @uri.port && + uri.scheme == @uri.scheme end def interests return :w if @state == :idle readable = !@read_buffer.full? @@ -112,11 +152,13 @@ transition(:closed) emit(:close) end def send(request, **args) - if @parser && !@write_buffer.full? + if @error_response + emit(:response, request, @error_response) + elsif @parser && !@write_buffer.full? parser.send(request, **args) else @pending << [request, args] end end @@ -209,37 +251,49 @@ transition(:idle) transition(:open) end end parser.on(:error) do |request, ex| - response = ErrorResponse.new(ex, @options) - emit(:response, request, response) + case ex + when MisdirectedRequestError + emit(:uncoalesce, request.uri) + else + response = ErrorResponse.new(ex, @options) + emit(:response, request, response) + end end parser end def transition(nextstate) case nextstate # when :idle - + when :idle + @error_response = nil when :open return if @state == :closed @io.connect return unless @io.connected? send_pending + emit(:open) when :closing return unless @state == :open when :closed return unless @state == :closing return unless @write_buffer.empty? @io.close @read_buffer.clear end @state = nextstate + rescue Errno::EHOSTUNREACH + # at this point, all addresses from the IO object have failed + reset + emit(:unreachable) + throw(:jump_tick) rescue Errno::ECONNREFUSED, - Errno::ENETUNREACH, Errno::EADDRNOTAVAIL, + Errno::EHOSTUNREACH, OpenSSL::SSL::SSLError => e # connect errors, exit gracefully handle_error(e) @state = :closed emit(:close) @@ -249,13 +303,13 @@ handle_error(ex) reset end def handle_error(e) - parser.handle_error(e) if parser.respond_to?(:handle_error) - response = ErrorResponse.new(e, @options) + parser.handle_error(e) if @parser && parser.respond_to?(:handle_error) + @error_response = ErrorResponse.new(e, @options) @pending.each do |request, _| - emit(:response, request, response) + emit(:response, request, @error_response) end end end end