lib/opentelemetry/exporter/otlp/exporter.rb in opentelemetry-exporter-otlp-0.8.0 vs lib/opentelemetry/exporter/otlp/exporter.rb in opentelemetry-exporter-otlp-0.9.0

- old
+ new

@@ -2,10 +2,11 @@ # Copyright The OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 +require 'opentelemetry/common' require 'opentelemetry/sdk' require 'net/http' require 'csv' require 'zlib' @@ -19,18 +20,18 @@ module OTLP # An OpenTelemetry trace exporter that sends spans over HTTP as Protobuf encoded OTLP ExportTraceServiceRequests. class Exporter # rubocop:disable Metrics/ClassLength SUCCESS = OpenTelemetry::SDK::Trace::Export::SUCCESS FAILURE = OpenTelemetry::SDK::Trace::Export::FAILURE - private_constant(:SUCCESS, :FAILURE) + TIMEOUT = OpenTelemetry::SDK::Trace::Export::TIMEOUT + private_constant(:SUCCESS, :FAILURE, :TIMEOUT) # Default timeouts in seconds. KEEP_ALIVE_TIMEOUT = 30 - OPEN_TIMEOUT = 5 - READ_TIMEOUT = 5 RETRY_COUNT = 5 - private_constant(:KEEP_ALIVE_TIMEOUT, :OPEN_TIMEOUT, :READ_TIMEOUT, :RETRY_COUNT) + WRITE_TIMEOUT_SUPPORTED = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('2.6') + private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :WRITE_TIMEOUT_SUPPORTED) def initialize(endpoint: config_opt('OTEL_EXPORTER_OTLP_SPAN_ENDPOINT', 'OTEL_EXPORTER_OTLP_ENDPOINT', default: 'localhost:55681/v1/trace'), # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity insecure: config_opt('OTEL_EXPORTER_OTLP_SPAN_INSECURE', 'OTEL_EXPORTER_OTLP_INSECURE', default: false), certificate_file: config_opt('OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE', 'OTEL_EXPORTER_OTLP_CERTIFICATE'), headers: config_opt('OTEL_EXPORTER_OTLP_SPAN_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS'), # TODO: what format is expected here? @@ -43,12 +44,10 @@ uri = URI "http://#{endpoint}" @http = Net::HTTP.new(uri.host, uri.port) @http.use_ssl = insecure.to_s.downcase == 'false' @http.ca_file = certificate_file unless certificate_file.nil? @http.keep_alive_timeout = KEEP_ALIVE_TIMEOUT - @http.open_timeout = OPEN_TIMEOUT - @http.read_timeout = READ_TIMEOUT @path = uri.path @headers = case headers when String then CSV.parse(headers, col_sep: ':', row_sep: ',').to_h when Hash then headers @@ -62,21 +61,24 @@ # Called to export sampled {OpenTelemetry::SDK::Trace::SpanData} structs. # # @param [Enumerable<OpenTelemetry::SDK::Trace::SpanData>] span_data the # list of recorded {OpenTelemetry::SDK::Trace::SpanData} structs to be # exported. + # @param [optional Numeric] timeout An optional timeout in seconds. # @return [Integer] the result of the export. - def export(span_data) + def export(span_data, timeout: nil) return FAILURE if @shutdown - send_bytes(encode(span_data)) + send_bytes(encode(span_data), timeout: timeout) end # Called when {OpenTelemetry::SDK::Trace::Tracer#shutdown} is called, if # this exporter is registered to a {OpenTelemetry::SDK::Trace::Tracer} # object. - def shutdown + # + # @param [optional Numeric] timeout An optional timeout in seconds. + def shutdown(timeout: nil) @shutdown = true @http.finish if @http.started? end private @@ -106,12 +108,14 @@ uri.path.nil? || uri.path.empty? rescue URI::InvalidURIError true end - def send_bytes(bytes) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity + def send_bytes(bytes, timeout:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity retry_count = 0 + timeout ||= @timeout + start_time = Time.now untraced do # rubocop:disable Metrics/BlockLength request = Net::HTTP::Post.new(@path) request.body = if @compression == 'gzip' request.add_field('Content-Encoding', 'gzip') Zlib.gzip(bytes) @@ -119,10 +123,16 @@ bytes end request.add_field('Content-Type', 'application/x-protobuf') @headers&.each { |key, value| request.add_field(key, value) } + remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) + return TIMEOUT if remaining_timeout.zero? + + @http.open_timeout = remaining_timeout + @http.read_timeout = remaining_timeout + @http.write_timeout = remaining_timeout if WRITE_TIMEOUT_SUPPORTED @http.start unless @http.started? response = @http.request(request) case response when Net::HTTPOK @@ -150,20 +160,25 @@ end rescue Net::OpenTimeout, Net::ReadTimeout retry if backoff?(retry_count: retry_count += 1) return FAILURE end + ensure + # Reset timeouts to defaults for the next call. + @http.open_timeout = @timeout + @http.read_timeout = @timeout + @http.write_timeout = @timeout if WRITE_TIMEOUT_SUPPORTED end def handle_redirect(location) # TODO: figure out destination and reinitialize @http and @path end def untraced OpenTelemetry::Trace.with_span(OpenTelemetry::Trace::Span.new) { yield } end - def backoff?(retry_after: nil, retry_count:, reason:) + def backoff?(retry_after: nil, retry_count:) return false if retry_count > RETRY_COUNT sleep_interval = nil unless retry_after.nil? sleep_interval =