lib/httpx/adapters/faraday.rb in httpx-0.24.1 vs lib/httpx/adapters/faraday.rb in httpx-0.24.2

- old
+ new

@@ -33,12 +33,50 @@ end module RequestMixin using ::HTTPX::HashExtensions + def build_connection(env) + return @connection if defined?(@connection) + + @connection = ::HTTPX.plugin(:compression).plugin(:persistent).plugin(ReasonPlugin) + @connection = @connection.with(@connection_options) unless @connection_options.empty? + @connection = @connection.with(options_from_env(env)) + + if (proxy = env.request.proxy) + proxy_options = { uri: proxy.uri } + proxy_options[:username] = proxy.user if proxy.user + proxy_options[:password] = proxy.password if proxy.password + + @connection = @connection.plugin(:proxy).with(proxy: proxy_options) + end + @connection = @connection.plugin(OnDataPlugin) if env.request.stream_response? + + @connection + end + + def close + @session.close + end + private + def connect(env, &blk) + connection(env, &blk) + rescue ::HTTPX::TLSError => e + raise SSL_ERROR, e + rescue Errno::ECONNABORTED, + Errno::ECONNREFUSED, + Errno::ECONNRESET, + Errno::EHOSTUNREACH, + Errno::EINVAL, + Errno::ENETUNREACH, + Errno::EPIPE, + ::HTTPX::ConnectionError => e + raise CONNECTION_FAILED_ERROR, e + end + def build_request(env) meth = env[:method] request_options = { headers: env.request_headers, @@ -46,32 +84,40 @@ } [meth.to_s.upcase, env.url, request_options] end def options_from_env(env) - timeout_options = { - connect_timeout: env.request.open_timeout, - operation_timeout: env.request.timeout, - }.compact + timeout_options = {} + if (sec = request_timeout(:read, env)) + timeout_options[:operation_timeout] = sec + end - options = { - ssl: {}, - timeout: timeout_options, - } + if (sec = request_timeout(:write, env)) + timeout_options[:operation_timeout] = sec + end - options[:ssl][:verify_mode] = OpenSSL::SSL::VERIFY_PEER if env.ssl.verify - options[:ssl][:ca_file] = env.ssl.ca_file if env.ssl.ca_file - options[:ssl][:ca_path] = env.ssl.ca_path if env.ssl.ca_path - options[:ssl][:cert_store] = env.ssl.cert_store if env.ssl.cert_store - options[:ssl][:cert] = env.ssl.client_cert if env.ssl.client_cert - options[:ssl][:key] = env.ssl.client_key if env.ssl.client_key - options[:ssl][:ssl_version] = env.ssl.version if env.ssl.version - options[:ssl][:verify_depth] = env.ssl.verify_depth if env.ssl.verify_depth - options[:ssl][:min_version] = env.ssl.min_version if env.ssl.min_version - options[:ssl][:max_version] = env.ssl.max_version if env.ssl.max_version + if (sec = request_timeout(:open, env)) + timeout_options[:connect_timeout] = sec + end - options + ssl_options = {} + + ssl_options[:verify_mode] = OpenSSL::SSL::VERIFY_PEER if env.ssl.verify + ssl_options[:ca_file] = env.ssl.ca_file if env.ssl.ca_file + ssl_options[:ca_path] = env.ssl.ca_path if env.ssl.ca_path + ssl_options[:cert_store] = env.ssl.cert_store if env.ssl.cert_store + ssl_options[:cert] = env.ssl.client_cert if env.ssl.client_cert + ssl_options[:key] = env.ssl.client_key if env.ssl.client_key + ssl_options[:ssl_version] = env.ssl.version if env.ssl.version + ssl_options[:verify_depth] = env.ssl.verify_depth if env.ssl.verify_depth + ssl_options[:min_version] = env.ssl.min_version if env.ssl.min_version + ssl_options[:max_version] = env.ssl.max_version if env.ssl.max_version + + { + ssl: ssl_options, + timeout: timeout_options, + } end end include RequestMixin @@ -120,14 +166,10 @@ end end end end - def self.session - @session ||= ::HTTPX.plugin(:compression).plugin(:persistent).plugin(ReasonPlugin) - end - class ParallelManager class ResponseHandler < SimpleDelegator attr_reader :env def initialize(env) @@ -156,12 +198,13 @@ end end include RequestMixin - def initialize + def initialize(options) @handlers = [] + @connection_options = options end def enqueue(request) handler = ResponseHandler.new(request) @handlers << handler @@ -171,44 +214,56 @@ def run return unless @handlers.last env = @handlers.last.env - session = HTTPX.session.with(options_from_env(env)) - session = session.plugin(:proxy).with(proxy: { uri: env.request.proxy }) if env.request.proxy - session = session.plugin(OnDataPlugin) if env.request.stream_response? + connect(env) do |session| + requests = @handlers.map { |handler| session.build_request(*build_request(handler.env)) } - requests = @handlers.map { |handler| session.build_request(*build_request(handler.env)) } + if env.request.stream_response? + requests.each do |request| + request.response_on_data = env.request.on_data + end + end - if env.request.stream_response? - requests.each do |request| - request.response_on_data = env.request.on_data + responses = session.request(*requests) + Array(responses).each_with_index do |response, index| + handler = @handlers[index] + handler.on_response.call(response) + handler.on_complete.call(handler.env) end end + end - responses = session.request(*requests) - Array(responses).each_with_index do |response, index| - handler = @handlers[index] - handler.on_response.call(response) - handler.on_complete.call(handler.env) + # from Faraday::Adapter#connection + def connection(env) + conn = build_connection(env) + return conn unless block_given? + + yield conn + end + + private + + # from Faraday::Adapter#request_timeout + def request_timeout(type, options) + key = Faraday::Adapter::TIMEOUT_KEYS.fetch(type) do + msg = "Expected :read, :write, :open. Got #{type.inspect} :(" + raise ArgumentError, msg end + options[key] || options[:timeout] end end self.supports_parallel = true class << self - def setup_parallel_manager - ParallelManager.new + def setup_parallel_manager(options = {}) + ParallelManager.new(options) end end - def initialize(app, options = {}) - super(app) - @session_options = options - end - def call(env) super if parallel?(env) handler = env[:parallel_manager].enqueue(env) handler.on_response do |response| @@ -222,40 +277,30 @@ end end return handler end - session = HTTPX.session - session = session.with(@session_options) unless @session_options.empty? - session = session.with(options_from_env(env)) - session = session.plugin(:proxy).with(proxy: { uri: env.request.proxy }) if env.request.proxy - session = session.plugin(OnDataPlugin) if env.request.stream_response? - - request = session.build_request(*build_request(env)) - - request.response_on_data = env.request.on_data if env.request.stream_response? - - response = session.request(request) - # do not call #raise_for_status for HTTP 4xx or 5xx, as faraday has a middleware for that. - response.raise_for_status unless response.is_a?(::HTTPX::Response) + response = connect_and_request(env) save_response(env, response.status, response.body.to_s, response.headers, response.reason) do |response_headers| response_headers.merge!(response.headers) end @app.call(env) - rescue ::HTTPX::TLSError => e - raise SSL_ERROR, e - rescue Errno::ECONNABORTED, - Errno::ECONNREFUSED, - Errno::ECONNRESET, - Errno::EHOSTUNREACH, - Errno::EINVAL, - Errno::ENETUNREACH, - Errno::EPIPE, - ::HTTPX::ConnectionError => e - raise CONNECTION_FAILED_ERROR, e end private + + def connect_and_request(env) + connect(env) do |session| + request = session.build_request(*build_request(env)) + + request.response_on_data = env.request.on_data if env.request.stream_response? + + response = session.request(request) + # do not call #raise_for_status for HTTP 4xx or 5xx, as faraday has a middleware for that. + response.raise_for_status unless response.is_a?(::HTTPX::Response) + response + end + end def parallel?(env) env[:parallel_manager] end end