lib/httpx/channel/http1.rb in httpx-0.2.1 vs lib/httpx/channel/http1.rb in httpx-0.3.0
- old
+ new
@@ -1,8 +1,8 @@
# frozen_string_literal: true
-require "http_parser"
+require "httpx/parser/http1"
module HTTPX
class Channel::HTTP1
include Callbacks
include Loggable
@@ -10,22 +10,20 @@
CRLF = "\r\n"
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
- @parser = HTTP::Parser.new(self)
- @parser.header_value_type = :arrays
+ @max_requests = Float::INFINITY
+ @parser = Parser::HTTP1.new(self)
@buffer = buffer
@version = [1, 1]
@pending = []
@requests = []
- @has_response = false
end
def reset
@parser.reset!
- @has_response = false
end
def close
reset
emit(:close)
@@ -37,19 +35,22 @@
@requests.empty? || @requests.all? { |request| !request.response.nil? }
end
def <<(data)
@parser << data
- dispatch if @has_response
end
def send(request, **)
- if @requests.size >= @max_concurrent_requests
+ if @max_requests.positive? &&
+ @requests.size >= @max_concurrent_requests
@pending << request
return
end
- @requests << request unless @requests.include?(request)
+ unless @requests.include?(request)
+ @requests << request
+ @pipelining = true if @requests.size > 1
+ end
handle(request)
end
def reenqueue!
requests = @requests.dup
@@ -67,103 +68,122 @@
# HTTP Parser callbacks
#
# must be public methods, or else they won't be reachable
- def on_message_begin
+ def on_start
log(level: 2) { "parsing begins" }
end
- def on_headers_complete(h)
- return on_trailer_headers_complete(h) if @parser_trailers
- # Wait for fix: https://github.com/tmm1/http_parser.rb/issues/52
- # callback is called 2 times when chunked
- request = @requests.first
- return if request.response
+ def on_headers(h)
+ @request = @requests.first
+ return if @request.response
log(level: 2) { "headers received" }
headers = @options.headers_class.new(h)
- response = @options.response_class.new(@requests.last,
+ response = @options.response_class.new(@request,
@parser.status_code,
@parser.http_version.join("."),
headers, @options)
log(color: :yellow) { "-> HEADLINE: #{response.status} HTTP/#{@parser.http_version.join(".")}" }
log(color: :yellow) { response.headers.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") }
- request.response = response
+ @request.response = response
+ on_complete if response.complete?
+ end
- @has_response = true if response.complete?
+ def on_trailers(h)
+ return unless @request
+ response = @request.response
+ log(level: 2) { "trailer headers received" }
+
+ log(color: :yellow) { h.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") }
+ response.merge_headers(h)
end
- def on_body(chunk)
+ def on_data(chunk)
+ return unless @request
log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "-> #{chunk.inspect}" }
- response = @requests.first.response
+ response = @request.response
response << chunk
-
- @has_response = response.complete?
end
- def on_message_complete
+ def on_complete
+ return unless @request
log(level: 2) { "parsing complete" }
- request = @requests.first
- response = request.response
-
- if !@parser_trailers && response.headers.key?("trailer")
- @parser_trailers = true
- # this is needed, because the parser can't accept further headers.
- # we need to reset it and artificially move it to receive headers state,
- # hence the bogus headline
- #
- @parser.reset!
- @parser << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{response.version}#{CRLF}"
- else
- @has_response = true
- end
+ dispatch
end
- def on_trailer_headers_complete(h)
- response = @requests.first.response
-
- response.merge_headers(h)
- end
-
def dispatch
- request = @requests.first
- return handle(request) if request.expects?
+ if @request.expects?
+ reset
+ return handle(@request)
+ end
+ request = @request
+ @request = nil
@requests.shift
response = request.response
emit(:response, request, response)
if @parser.upgrade?
response << @parser.upgrade_data
throw(:called)
end
+
reset
+ @max_requests -= 1
send(@pending.shift) unless @pending.empty?
- return unless response.headers["connection"] == "close"
- disable_concurrency
- emit(:reset)
+ manage_connection(response)
end
def handle_error(ex)
- @requests.each do |request|
- emit(:error, request, ex)
+ if @pipelining
+ disable_pipelining
+ emit(:reset)
+ throw(:called)
+ else
+ @requests.each do |request|
+ emit(:error, request, ex)
+ end
end
end
private
- def disable_concurrency
+ def manage_connection(response)
+ connection = response.headers["connection"]
+ case connection
+ when /keep\-alive/i
+ keep_alive = response.headers["keep-alive"]
+ return unless keep_alive
+ parameters = Hash[keep_alive.split(/ *, */).map do |pair|
+ pair.split(/ *= */)
+ end]
+ @max_requests = parameters["max"].to_i if parameters.key?("max")
+ if parameters.key?("timeout")
+ keep_alive_timeout = parameters["timeout"].to_i
+ emit(:timeout, keep_alive_timeout)
+ end
+ # TODO: on keep alive timeout, reset
+ when /close/i, nil
+ disable_pipelining
+ @max_requests = Float::INFINITY
+ emit(:reset)
+ end
+ end
+
+ def disable_pipelining
return if @requests.empty?
@requests.each { |r| r.transition(:idle) }
# server doesn't handle pipelining, and probably
# doesn't support keep-alive. Fallback to send only
# 1 keep alive request.
@max_concurrent_requests = 1
+ @pipelining = false
end
def set_request_headers(request)
request.headers["host"] ||= request.authority
request.headers["connection"] ||= "keep-alive"
@@ -176,10 +196,9 @@
def headline_uri(request)
request.path
end
def handle(request)
- @has_response = false
set_request_headers(request)
catch(:buffer_full) do
request.transition(:headers)
join_headers(request) if request.state == :headers
request.transition(:body)