lib/httpx/adapters/faraday.rb in httpx-0.24.1 vs lib/httpx/adapters/faraday.rb in httpx-0.24.2
- old
+ new
@@ -33,12 +33,50 @@
end
module RequestMixin
using ::HTTPX::HashExtensions
+ def build_connection(env)
+ return @connection if defined?(@connection)
+
+ @connection = ::HTTPX.plugin(:compression).plugin(:persistent).plugin(ReasonPlugin)
+ @connection = @connection.with(@connection_options) unless @connection_options.empty?
+ @connection = @connection.with(options_from_env(env))
+
+ if (proxy = env.request.proxy)
+ proxy_options = { uri: proxy.uri }
+ proxy_options[:username] = proxy.user if proxy.user
+ proxy_options[:password] = proxy.password if proxy.password
+
+ @connection = @connection.plugin(:proxy).with(proxy: proxy_options)
+ end
+ @connection = @connection.plugin(OnDataPlugin) if env.request.stream_response?
+
+ @connection
+ end
+
+ def close
+ @session.close
+ end
+
private
+ def connect(env, &blk)
+ connection(env, &blk)
+ rescue ::HTTPX::TLSError => e
+ raise SSL_ERROR, e
+ rescue Errno::ECONNABORTED,
+ Errno::ECONNREFUSED,
+ Errno::ECONNRESET,
+ Errno::EHOSTUNREACH,
+ Errno::EINVAL,
+ Errno::ENETUNREACH,
+ Errno::EPIPE,
+ ::HTTPX::ConnectionError => e
+ raise CONNECTION_FAILED_ERROR, e
+ end
+
def build_request(env)
meth = env[:method]
request_options = {
headers: env.request_headers,
@@ -46,32 +84,40 @@
}
[meth.to_s.upcase, env.url, request_options]
end
def options_from_env(env)
- timeout_options = {
- connect_timeout: env.request.open_timeout,
- operation_timeout: env.request.timeout,
- }.compact
+ timeout_options = {}
+ if (sec = request_timeout(:read, env))
+ timeout_options[:operation_timeout] = sec
+ end
- options = {
- ssl: {},
- timeout: timeout_options,
- }
+ if (sec = request_timeout(:write, env))
+ timeout_options[:operation_timeout] = sec
+ end
- options[:ssl][:verify_mode] = OpenSSL::SSL::VERIFY_PEER if env.ssl.verify
- options[:ssl][:ca_file] = env.ssl.ca_file if env.ssl.ca_file
- options[:ssl][:ca_path] = env.ssl.ca_path if env.ssl.ca_path
- options[:ssl][:cert_store] = env.ssl.cert_store if env.ssl.cert_store
- options[:ssl][:cert] = env.ssl.client_cert if env.ssl.client_cert
- options[:ssl][:key] = env.ssl.client_key if env.ssl.client_key
- options[:ssl][:ssl_version] = env.ssl.version if env.ssl.version
- options[:ssl][:verify_depth] = env.ssl.verify_depth if env.ssl.verify_depth
- options[:ssl][:min_version] = env.ssl.min_version if env.ssl.min_version
- options[:ssl][:max_version] = env.ssl.max_version if env.ssl.max_version
+ if (sec = request_timeout(:open, env))
+ timeout_options[:connect_timeout] = sec
+ end
- options
+ ssl_options = {}
+
+ ssl_options[:verify_mode] = OpenSSL::SSL::VERIFY_PEER if env.ssl.verify
+ ssl_options[:ca_file] = env.ssl.ca_file if env.ssl.ca_file
+ ssl_options[:ca_path] = env.ssl.ca_path if env.ssl.ca_path
+ ssl_options[:cert_store] = env.ssl.cert_store if env.ssl.cert_store
+ ssl_options[:cert] = env.ssl.client_cert if env.ssl.client_cert
+ ssl_options[:key] = env.ssl.client_key if env.ssl.client_key
+ ssl_options[:ssl_version] = env.ssl.version if env.ssl.version
+ ssl_options[:verify_depth] = env.ssl.verify_depth if env.ssl.verify_depth
+ ssl_options[:min_version] = env.ssl.min_version if env.ssl.min_version
+ ssl_options[:max_version] = env.ssl.max_version if env.ssl.max_version
+
+ {
+ ssl: ssl_options,
+ timeout: timeout_options,
+ }
end
end
include RequestMixin
@@ -120,14 +166,10 @@
end
end
end
end
- def self.session
- @session ||= ::HTTPX.plugin(:compression).plugin(:persistent).plugin(ReasonPlugin)
- end
-
class ParallelManager
class ResponseHandler < SimpleDelegator
attr_reader :env
def initialize(env)
@@ -156,12 +198,13 @@
end
end
include RequestMixin
- def initialize
+ def initialize(options)
@handlers = []
+ @connection_options = options
end
def enqueue(request)
handler = ResponseHandler.new(request)
@handlers << handler
@@ -171,44 +214,56 @@
def run
return unless @handlers.last
env = @handlers.last.env
- session = HTTPX.session.with(options_from_env(env))
- session = session.plugin(:proxy).with(proxy: { uri: env.request.proxy }) if env.request.proxy
- session = session.plugin(OnDataPlugin) if env.request.stream_response?
+ connect(env) do |session|
+ requests = @handlers.map { |handler| session.build_request(*build_request(handler.env)) }
- requests = @handlers.map { |handler| session.build_request(*build_request(handler.env)) }
+ if env.request.stream_response?
+ requests.each do |request|
+ request.response_on_data = env.request.on_data
+ end
+ end
- if env.request.stream_response?
- requests.each do |request|
- request.response_on_data = env.request.on_data
+ responses = session.request(*requests)
+ Array(responses).each_with_index do |response, index|
+ handler = @handlers[index]
+ handler.on_response.call(response)
+ handler.on_complete.call(handler.env)
end
end
+ end
- responses = session.request(*requests)
- Array(responses).each_with_index do |response, index|
- handler = @handlers[index]
- handler.on_response.call(response)
- handler.on_complete.call(handler.env)
+ # from Faraday::Adapter#connection
+ def connection(env)
+ conn = build_connection(env)
+ return conn unless block_given?
+
+ yield conn
+ end
+
+ private
+
+ # from Faraday::Adapter#request_timeout
+ def request_timeout(type, options)
+ key = Faraday::Adapter::TIMEOUT_KEYS.fetch(type) do
+ msg = "Expected :read, :write, :open. Got #{type.inspect} :("
+ raise ArgumentError, msg
end
+ options[key] || options[:timeout]
end
end
self.supports_parallel = true
class << self
- def setup_parallel_manager
- ParallelManager.new
+ def setup_parallel_manager(options = {})
+ ParallelManager.new(options)
end
end
- def initialize(app, options = {})
- super(app)
- @session_options = options
- end
-
def call(env)
super
if parallel?(env)
handler = env[:parallel_manager].enqueue(env)
handler.on_response do |response|
@@ -222,40 +277,30 @@
end
end
return handler
end
- session = HTTPX.session
- session = session.with(@session_options) unless @session_options.empty?
- session = session.with(options_from_env(env))
- session = session.plugin(:proxy).with(proxy: { uri: env.request.proxy }) if env.request.proxy
- session = session.plugin(OnDataPlugin) if env.request.stream_response?
-
- request = session.build_request(*build_request(env))
-
- request.response_on_data = env.request.on_data if env.request.stream_response?
-
- response = session.request(request)
- # do not call #raise_for_status for HTTP 4xx or 5xx, as faraday has a middleware for that.
- response.raise_for_status unless response.is_a?(::HTTPX::Response)
+ response = connect_and_request(env)
save_response(env, response.status, response.body.to_s, response.headers, response.reason) do |response_headers|
response_headers.merge!(response.headers)
end
@app.call(env)
- rescue ::HTTPX::TLSError => e
- raise SSL_ERROR, e
- rescue Errno::ECONNABORTED,
- Errno::ECONNREFUSED,
- Errno::ECONNRESET,
- Errno::EHOSTUNREACH,
- Errno::EINVAL,
- Errno::ENETUNREACH,
- Errno::EPIPE,
- ::HTTPX::ConnectionError => e
- raise CONNECTION_FAILED_ERROR, e
end
private
+
+ def connect_and_request(env)
+ connect(env) do |session|
+ request = session.build_request(*build_request(env))
+
+ request.response_on_data = env.request.on_data if env.request.stream_response?
+
+ response = session.request(request)
+ # do not call #raise_for_status for HTTP 4xx or 5xx, as faraday has a middleware for that.
+ response.raise_for_status unless response.is_a?(::HTTPX::Response)
+ response
+ end
+ end
def parallel?(env)
env[:parallel_manager]
end
end