lib/httpx/channel.rb in httpx-0.1.0 vs lib/httpx/channel.rb in httpx-0.2.0
- old
+ new
@@ -48,41 +48,81 @@
when "https" then "ssl"
else
raise Error, "#{uri}: #{uri.scheme}: unrecognized channel"
end
end
- io = IO.registry(type).new(uri, options)
- new(io, options)
+ new(type, uri, options)
end
end
def_delegator :@io, :closed?
def_delegator :@write_buffer, :empty?
- def initialize(io, options)
- @io = io
+ attr_reader :uri, :state
+
+ def initialize(type, uri, options)
+ @type = type
+ @uri = uri
+ @hostnames = [@uri.host]
@options = Options.new(options)
@window_size = @options.window_size
@read_buffer = Buffer.new(BUFFER_SIZE)
@write_buffer = Buffer.new(BUFFER_SIZE)
@pending = []
@state = :idle
on(:error) { |ex| on_error(ex) }
end
+ def addresses=(addrs)
+ @io = IO.registry(@type).new(@uri, addrs, @options)
+ end
+
+ def mergeable?(addresses)
+ return false if @state == :closing || !@io
+ !(@io.addresses & addresses).empty?
+ end
+
+ # coalescable channels need to be mergeable!
+ # but internally, #mergeable? is called before #coalescable?
+ def coalescable?(channel)
+ if @io.protocol == "h2" && @uri.scheme == "https"
+ @io.verify_hostname(channel.uri.host)
+ else
+ @uri.host == channel.uri.host &&
+ @uri.port == channel.uri.port &&
+ @uri.scheme == channel.uri.scheme
+ end
+ end
+
+ def merge(channel)
+ @hostnames += channel.instance_variable_get(:@hostnames)
+ pending = channel.instance_variable_get(:@pending)
+ pending.each do |req, args|
+ send(req, args)
+ end
+ end
+
+ def unmerge(channel)
+ @hostnames -= channel.instance_variable_get(:@hostnames)
+ [@parser.pending, @pending].each do |pending|
+ pending.reject! do |request|
+ request.uri == channel.uri && begin
+ request.transition(:idle)
+ channel.send(request)
+ true
+ end
+ end
+ end
+ end
+
def match?(uri)
return false if @state == :closing
- ips = begin
- Resolv.getaddresses(uri.host)
- rescue StandardError
- [uri.host]
- end
- ips.include?(@io.ip) &&
- uri.port == @io.port &&
- uri.scheme == @io.scheme
+ @hostnames.include?(uri.host) &&
+ uri.port == @uri.port &&
+ uri.scheme == @uri.scheme
end
def interests
return :w if @state == :idle
readable = !@read_buffer.full?
@@ -112,11 +152,13 @@
transition(:closed)
emit(:close)
end
def send(request, **args)
- if @parser && !@write_buffer.full?
+ if @error_response
+ emit(:response, request, @error_response)
+ elsif @parser && !@write_buffer.full?
parser.send(request, **args)
else
@pending << [request, args]
end
end
@@ -209,37 +251,49 @@
transition(:idle)
transition(:open)
end
end
parser.on(:error) do |request, ex|
- response = ErrorResponse.new(ex, @options)
- emit(:response, request, response)
+ case ex
+ when MisdirectedRequestError
+ emit(:uncoalesce, request.uri)
+ else
+ response = ErrorResponse.new(ex, @options)
+ emit(:response, request, response)
+ end
end
parser
end
def transition(nextstate)
case nextstate
# when :idle
-
+ when :idle
+ @error_response = nil
when :open
return if @state == :closed
@io.connect
return unless @io.connected?
send_pending
+ emit(:open)
when :closing
return unless @state == :open
when :closed
return unless @state == :closing
return unless @write_buffer.empty?
@io.close
@read_buffer.clear
end
@state = nextstate
+ rescue Errno::EHOSTUNREACH
+ # at this point, all addresses from the IO object have failed
+ reset
+ emit(:unreachable)
+ throw(:jump_tick)
rescue Errno::ECONNREFUSED,
- Errno::ENETUNREACH,
Errno::EADDRNOTAVAIL,
+ Errno::EHOSTUNREACH,
OpenSSL::SSL::SSLError => e
# connect errors, exit gracefully
handle_error(e)
@state = :closed
emit(:close)
@@ -249,13 +303,13 @@
handle_error(ex)
reset
end
def handle_error(e)
- parser.handle_error(e) if parser.respond_to?(:handle_error)
- response = ErrorResponse.new(e, @options)
+ parser.handle_error(e) if @parser && parser.respond_to?(:handle_error)
+ @error_response = ErrorResponse.new(e, @options)
@pending.each do |request, _|
- emit(:response, request, response)
+ emit(:response, request, @error_response)
end
end
end
end