lib/httpx/connection.rb in httpx-0.6.7 vs lib/httpx/connection.rb in httpx-0.7.0
- old
+ new
@@ -44,10 +44,12 @@
def_delegator :@write_buffer, :empty?
attr_reader :origin, :state, :pending, :options
+ attr_writer :timers
+
def initialize(type, uri, options)
@type = type
@origins = [uri.origin]
@origin = URI(uri.origin)
@options = Options.new(options)
@@ -78,10 +80,12 @@
end
def match?(uri, options)
return false if @state == :closing || @state == :closed
+ return false if exhausted?
+
(
(
@origins.include?(uri.origin) &&
# if there is more than one origin to match, it means that this connection
# was the result of coalescing. To prevent blind trust in the case where the
@@ -93,10 +97,12 @@
end
def mergeable?(connection)
return false if @state == :closing || @state == :closed || !@io
+ return false if exhausted?
+
!(@io.addresses & connection.addresses).empty? && @options == connection.options
end
# coalescable connections need to be mergeable!
# but internally, #mergeable? is called before #coalescable?
@@ -108,14 +114,17 @@
else
@origin == connection.origin
end
end
+ def create_idle
+ self.class.new(@type, @origin, @options)
+ end
+
def merge(connection)
@origins += connection.instance_variable_get(:@origins)
- pending = connection.instance_variable_get(:@pending)
- pending.each do |req|
+ connection.purge_pending do |req|
send(req)
end
end
def unmerge(connection)
@@ -128,11 +137,14 @@
end
end
end
def purge_pending
- [*@parser.pending, *@pending].each do |pending|
+ pendings = []
+ pendings << @parser.pending if @parser
+ pendings << @pending
+ pendings.each do |pending|
pending.reject! do |request|
yield request
end
end
end
@@ -211,10 +223,14 @@
@options.timeout.operation_timeout
end
private
+ def exhausted?
+ @parser && parser.exhausted?
+ end
+
def consume
catch(:called) do
dread
dwrite
parser.consume
@@ -283,10 +299,13 @@
end
parser.on(:promise) do |request, stream|
request.emit(:promise, parser, stream)
end
+ parser.on(:exhausted) do
+ emit(:exhausted)
+ end
parser.on(:origin) do |origin|
@origins << origin
end
parser.on(:close) do
transition(:closing)
@@ -317,10 +336,12 @@
def transition(nextstate)
case nextstate
when :open
return if @state == :closed
+ total_timeout
+
@io.connect
return unless @io.connected?
send_pending
emit(:open)
@@ -328,10 +349,15 @@
return unless @state == :open
when :closed
return unless @state == :closing
return unless @write_buffer.empty?
+ if @total_timeout
+ @total_timeout.cancel
+ remove_instance_variable(:@total_timeout)
+ end
+
@io.close
@read_buffer.clear
remove_instance_variable(:@timeout) if defined?(@timeout)
when :already_open
nextstate = :open
@@ -369,9 +395,22 @@
end
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
while (request = @pending.shift)
request.emit(:response, ErrorResponse.new(request, error, @options))
+ end
+ end
+
+ def total_timeout
+ total = @options.timeout.total_timeout
+
+ return unless total
+
+ @total_timeout ||= @timers.after(total) do
+ ex = TotalTimeoutError.new(total, "Timed out after #{total} seconds")
+ ex.set_backtrace(caller)
+ @parser.close if @parser
+ on_error(ex)
end
end
end
end