lib/httpx/channel/http1.rb in httpx-0.2.1 vs lib/httpx/channel/http1.rb in httpx-0.3.0

- old
+ new

@@ -1,8 +1,8 @@ # frozen_string_literal: true -require "http_parser" +require "httpx/parser/http1" module HTTPX class Channel::HTTP1 include Callbacks include Loggable @@ -10,22 +10,20 @@ CRLF = "\r\n" def initialize(buffer, options) @options = Options.new(options) @max_concurrent_requests = @options.max_concurrent_requests - @parser = HTTP::Parser.new(self) - @parser.header_value_type = :arrays + @max_requests = Float::INFINITY + @parser = Parser::HTTP1.new(self) @buffer = buffer @version = [1, 1] @pending = [] @requests = [] - @has_response = false end def reset @parser.reset! - @has_response = false end def close reset emit(:close) @@ -37,19 +35,22 @@ @requests.empty? || @requests.all? { |request| !request.response.nil? } end def <<(data) @parser << data - dispatch if @has_response end def send(request, **) - if @requests.size >= @max_concurrent_requests + if @max_requests.positive? && + @requests.size >= @max_concurrent_requests @pending << request return end - @requests << request unless @requests.include?(request) + unless @requests.include?(request) + @requests << request + @pipelining = true if @requests.size > 1 + end handle(request) end def reenqueue! requests = @requests.dup @@ -67,103 +68,122 @@ # HTTP Parser callbacks # # must be public methods, or else they won't be reachable - def on_message_begin + def on_start log(level: 2) { "parsing begins" } end - def on_headers_complete(h) - return on_trailer_headers_complete(h) if @parser_trailers - # Wait for fix: https://github.com/tmm1/http_parser.rb/issues/52 - # callback is called 2 times when chunked - request = @requests.first - return if request.response + def on_headers(h) + @request = @requests.first + return if @request.response log(level: 2) { "headers received" } headers = @options.headers_class.new(h) - response = @options.response_class.new(@requests.last, + response = @options.response_class.new(@request, @parser.status_code, @parser.http_version.join("."), headers, @options) log(color: :yellow) { "-> HEADLINE: #{response.status} HTTP/#{@parser.http_version.join(".")}" } log(color: :yellow) { response.headers.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") } - request.response = response + @request.response = response + on_complete if response.complete? + end - @has_response = true if response.complete? + def on_trailers(h) + return unless @request + response = @request.response + log(level: 2) { "trailer headers received" } + + log(color: :yellow) { h.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") } + response.merge_headers(h) end - def on_body(chunk) + def on_data(chunk) + return unless @request log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." } log(level: 2, color: :green) { "-> #{chunk.inspect}" } - response = @requests.first.response + response = @request.response response << chunk - - @has_response = response.complete? end - def on_message_complete + def on_complete + return unless @request log(level: 2) { "parsing complete" } - request = @requests.first - response = request.response - - if !@parser_trailers && response.headers.key?("trailer") - @parser_trailers = true - # this is needed, because the parser can't accept further headers. - # we need to reset it and artificially move it to receive headers state, - # hence the bogus headline - # - @parser.reset! - @parser << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{response.version}#{CRLF}" - else - @has_response = true - end + dispatch end - def on_trailer_headers_complete(h) - response = @requests.first.response - - response.merge_headers(h) - end - def dispatch - request = @requests.first - return handle(request) if request.expects? + if @request.expects? + reset + return handle(@request) + end + request = @request + @request = nil @requests.shift response = request.response emit(:response, request, response) if @parser.upgrade? response << @parser.upgrade_data throw(:called) end + reset + @max_requests -= 1 send(@pending.shift) unless @pending.empty? - return unless response.headers["connection"] == "close" - disable_concurrency - emit(:reset) + manage_connection(response) end def handle_error(ex) - @requests.each do |request| - emit(:error, request, ex) + if @pipelining + disable_pipelining + emit(:reset) + throw(:called) + else + @requests.each do |request| + emit(:error, request, ex) + end end end private - def disable_concurrency + def manage_connection(response) + connection = response.headers["connection"] + case connection + when /keep\-alive/i + keep_alive = response.headers["keep-alive"] + return unless keep_alive + parameters = Hash[keep_alive.split(/ *, */).map do |pair| + pair.split(/ *= */) + end] + @max_requests = parameters["max"].to_i if parameters.key?("max") + if parameters.key?("timeout") + keep_alive_timeout = parameters["timeout"].to_i + emit(:timeout, keep_alive_timeout) + end + # TODO: on keep alive timeout, reset + when /close/i, nil + disable_pipelining + @max_requests = Float::INFINITY + emit(:reset) + end + end + + def disable_pipelining return if @requests.empty? @requests.each { |r| r.transition(:idle) } # server doesn't handle pipelining, and probably # doesn't support keep-alive. Fallback to send only # 1 keep alive request. @max_concurrent_requests = 1 + @pipelining = false end def set_request_headers(request) request.headers["host"] ||= request.authority request.headers["connection"] ||= "keep-alive" @@ -176,10 +196,9 @@ def headline_uri(request) request.path end def handle(request) - @has_response = false set_request_headers(request) catch(:buffer_full) do request.transition(:headers) join_headers(request) if request.state == :headers request.transition(:body)