lib/net/http/pipeline.rb in net-http-pipeline-0.1 vs lib/net/http/pipeline.rb in net-http-pipeline-1.0
- old
+ new
@@ -1,75 +1,258 @@
require 'net/http'
##
-# An HTTP/1.1 pipelining implementation atop Net::HTTP. Currently this is not
+# An HTTP/1.1 pipelining implementation atop Net::HTTP. This library is
# compliant with RFC 2616 8.1.2.2.
#
-# Pipeline allows pou to create a bunch of requests then pipeline them to an
-# HTTP/1.1 server.
+# Pipeline allows you to create a bunch of requests then send them all to an
+# HTTP/1.1 server without waiting for responses. The server will return HTTP
+# responses in-order.
#
+# Net::HTTP::Pipeline does not assume the server supports pipelining. If you
+# know the server supports pipelining you can set Net::HTTP#pipelining to
+# true.
+#
# = Example
#
# require 'net/http/pipeline'
#
# Net::HTTP.start 'localhost' do |http|
-# req1 = Net::HTTP::Get.new '/'
-# req2 = Net::HTTP::Get.new '/'
+# requests = []
+# requests << Net::HTTP::Get.new('/')
+# requests << Net::HTTP::Get.new('/')
+# requests << Net::HTTP::Get.new('/')
#
-# http.pipeline req1, req2 do |res|
+# http.pipeline requests do |res|
# puts res.code
# puts res.body[0..60].inspect
# puts
# end
# end
module Net::HTTP::Pipeline
- VERSION = '0.1'
+ ##
+ # The version of net-http-pipeline you are using
+ VERSION = '1.0'
+
##
# Pipeline error class
class Error < RuntimeError
+
+ ##
+ # Remaining requests that have not been sent to the HTTP server
+
+ attr_reader :requests
+
+ ##
+ # Retrieved responses up to the error point
+
+ attr_reader :responses
+
+ ##
+ # Creates a new Error with +message+, a list of +requests+ that have not
+ # been sent to the server and a list of +responses+ that have been
+ # retrieved from the server.
+
+ def initialize message, requests, responses
+ super message
+
+ @requests = requests
+ @responses = responses
+ end
+
end
##
+ # Raised when an invalid version is given
+
+ class VersionError < Error
+ ##
+ # Creates a new VersionError with a list of +requests+ that have not been
+ # sent to the server and a list of +responses+ that have been retrieved
+ # from the server.
+
+ def initialize requests, responses
+ super 'HTTP/1.1 or newer required', requests, responses
+ end
+ end
+
+ ##
+ # Raised when the server appears to not support persistent connections
+
+ class PersistenceError < Error
+ ##
+ # Creates a new PersistenceError with a list of +requests+ that have not
+ # been sent to the server and a list of +responses+ that have been
+ # retrieved from the server.
+
+ def initialize requests, responses
+ super 'persistent connections required', requests, responses
+ end
+ end
+
+ ##
+ # Raised when the server appears to not support pipelining connections
+
+ class PipelineError < Error
+ ##
+ # Creates a new PipelineError with a list of +requests+ that have not been
+ # sent to the server and a list of +responses+ that have been retrieved
+ # from the server.
+
+ def initialize requests, responses
+ super 'pipeline connections are not supported', requests, responses
+ end
+ end
+
+ ##
+ # Raised if an error occurs while reading responses.
+
+ class ResponseError < Error
+ ##
+ # The original exception
+
+ attr_accessor :original
+
+ ##
+ # Creates a new ResponseError with an original +exception+, a list of
+ # +requests+ that were in-flight and a list of +responses+ that have been
+ # retrieved from the server.
+
+ def initialize exception, requests, responses
+ @original = exception
+ message = "error handling responses: #{original} (#{original.class})"
+ super message, requests, responses
+ end
+ end
+
+ ##
+ # Pipelining capability accessor.
+ #
+ # Pipeline assumes servers do not support pipelining by default. The first
+ # request is not pipelined while Pipeline ensures that the server is
+ # HTTP/1.1 or newer and defaults to persistent connections.
+ #
+ # If you know the server is HTTP/1.1 and defaults to persistent
+ # connections you can set this to true when you create the Net::HTTP object.
+
+ attr_accessor :pipelining
+
+ ##
+ # Is +req+ idempotent according to RFC 2616?
+
+ def idempotent? req
+ case req
+ when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
+ Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
+ true
+ end
+ end
+
+ ##
# Pipelines +requests+ to the HTTP server yielding responses if a block is
# given. Returns all responses recieved.
#
- # Raises an exception if the connection is not pipelining-capable or if the
+ # The Net::HTTP connection must be started before calling #pipeline.
+ #
+ # Raises an exception if the connection is not pipeline-capable or if the
# HTTP session has not been started.
- def pipeline *requests
- raise Error, 'pipelining requires HTTP/1.1 or newer' unless
- @curr_http_version >= '1.1'
- raise Error, 'Net::HTTP not started' unless started?
+ def pipeline requests, &block # :yields: response
+ responses = []
- requests.each do |req|
- begin_transport req
- req.exec @socket, @curr_http_version, edit_path(req.path)
- end
+ raise Error.new('Net::HTTP not started', requests, responses) unless
+ started?
- responses = []
+ raise VersionError.new(requests, responses) if '1.1' > @curr_http_version
- requests.each do |req|
+ pipeline_check requests, responses, &block
+
+ retried = responses.length
+
+ until requests.empty? do
begin
- res = Net::HTTPResponse.read_new @socket
- end while res.kind_of? Net::HTTPContinue
+ in_flight = pipeline_send requests
- res.reading_body @socket, req.response_body_permitted? do
- responses << res
- yield res if block_given?
- end
+ pipeline_receive in_flight, responses, &block
+ rescue Net::HTTP::Pipeline::ResponseError => e
+ e.requests.reverse_each do |request|
+ requests.unshift request
+ end
- pipeline_end_transport res
+ raise if responses.length == retried or not idempotent? requests.first
+
+ retried = responses.length
+
+ pipeline_reset requests, responses
+
+ retry
+ end
end
responses
end
##
+ # Ensures the connection supports pipelining.
+ #
+ # If the server has not been tested for pipelining support one of the
+ # +requests+ will be consumed and placed in +responses+.
+ #
+ # A VersionError will be raised if the server is not HTTP/1.1 or newer.
+ #
+ # A PersistenceError will be raised if the server does not support
+ # persistent connections.
+ #
+ # A PipelineError will be raised if the it was previously determined that
+ # the server does not support pipelining.
+
+ def pipeline_check requests, responses
+ if instance_variable_defined? :@pipelining then
+ return if @pipelining
+ raise PipelineError.new(requests, responses) unless @pipelining
+ else
+ @pipelining = false
+ end
+
+ req = requests.shift
+ retried = false
+
+ begin
+ res = request req
+ rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
+ Errno::EPIPE, Net::HTTPBadResponse => e
+ if retried then
+ requests.unshift req
+ raise ResponseError.new(e, requests, responses)
+ end
+
+ retried = true
+
+ pipeline_reset requests, responses
+
+ retry
+ end
+
+ responses << res
+
+ yield res if block_given?
+
+ @pipelining = pipeline_keep_alive? res
+
+ if '1.1' > @curr_http_version then
+ @pipelining = false
+ raise VersionError.new(requests, responses)
+ elsif not @pipelining then
+ raise PersistenceError.new(requests, responses)
+ end
+ end
+
+ ##
# Updates the HTTP version and ensures the connection has keep-alive.
def pipeline_end_transport res
@curr_http_version = res.http_version
@@ -81,20 +264,104 @@
D 'Conn close on pipeline'
@socket.close
end
end
+ ##
+ # Closes the connection and rescues any IOErrors this may cause
+
+ def pipeline_finish
+ finish
+ rescue IOError
+ end
+
if Net::HTTPResponse.allocate.respond_to? :connection_close? then
##
# Checks for an connection close header
def pipeline_keep_alive? res
not res.connection_close?
end
else
def pipeline_keep_alive? res
- not /close/i =~ res['connection'].to_s
+ not res['connection'].to_s =~ /close/i
end
+ end
+
+ ##
+ # Receives HTTP responses for +in_flight+ requests and adds them to
+ # +responses+
+
+ def pipeline_receive in_flight, responses
+ while req = in_flight.shift do
+ begin
+ begin
+ res = Net::HTTPResponse.read_new @socket
+ end while res.kind_of? Net::HTTPContinue
+
+ res.reading_body @socket, req.response_body_permitted? do
+ responses << res
+ yield res if block_given?
+ end
+
+ pipeline_end_transport res
+ rescue StandardError, Timeout::Error
+ in_flight.unshift req
+ raise
+ end
+ end
+
+ responses
+ rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
+ Errno::EPIPE, Net::HTTPBadResponse => e
+ pipeline_finish
+
+ raise ResponseError.new(e, in_flight, responses)
+ end
+
+ ##
+ # Resets this connection
+
+ def pipeline_reset requests, responses
+ pipeline_finish
+
+ start
+ rescue Errno::ECONNREFUSED
+ raise Error.new("connection refused: #{address}:#{port}", requests,
+ responses)
+ rescue Errno::EHOSTDOWN
+ raise Error.new("host down: #{address}:#{port}", requests, responses)
+ end
+
+ ##
+ # Sends +requests+ to the HTTP server and removes them from the +requests+
+ # list. Returns the requests that have been pipelined and are in-flight.
+ #
+ # If a non-idempotent request is first in +requests+ it will be sent and no
+ # further requests will be pipelined.
+ #
+ # If a non-idempotent request is encountered after an idempotent request it
+ # will not be sent.
+
+ def pipeline_send requests
+ in_flight = []
+
+ while req = requests.shift do
+ idempotent = idempotent? req
+
+ unless idempotent or in_flight.empty? then
+ requests.unshift req
+ break
+ end
+
+ begin_transport req
+ req.exec @socket, @curr_http_version, edit_path(req.path)
+ in_flight << req
+
+ break unless idempotent
+ end
+
+ in_flight
end
end
class Net::HTTP