lib/opentelemetry/exporter/otlp/exporter.rb in opentelemetry-exporter-otlp-0.8.0 vs lib/opentelemetry/exporter/otlp/exporter.rb in opentelemetry-exporter-otlp-0.9.0
- old
+ new
@@ -2,10 +2,11 @@
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
+require 'opentelemetry/common'
require 'opentelemetry/sdk'
require 'net/http'
require 'csv'
require 'zlib'
@@ -19,18 +20,18 @@
module OTLP
# An OpenTelemetry trace exporter that sends spans over HTTP as Protobuf encoded OTLP ExportTraceServiceRequests.
class Exporter # rubocop:disable Metrics/ClassLength
SUCCESS = OpenTelemetry::SDK::Trace::Export::SUCCESS
FAILURE = OpenTelemetry::SDK::Trace::Export::FAILURE
- private_constant(:SUCCESS, :FAILURE)
+ TIMEOUT = OpenTelemetry::SDK::Trace::Export::TIMEOUT
+ private_constant(:SUCCESS, :FAILURE, :TIMEOUT)
# Default timeouts in seconds.
KEEP_ALIVE_TIMEOUT = 30
- OPEN_TIMEOUT = 5
- READ_TIMEOUT = 5
RETRY_COUNT = 5
- private_constant(:KEEP_ALIVE_TIMEOUT, :OPEN_TIMEOUT, :READ_TIMEOUT, :RETRY_COUNT)
+ WRITE_TIMEOUT_SUPPORTED = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('2.6')
+ private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :WRITE_TIMEOUT_SUPPORTED)
def initialize(endpoint: config_opt('OTEL_EXPORTER_OTLP_SPAN_ENDPOINT', 'OTEL_EXPORTER_OTLP_ENDPOINT', default: 'localhost:55681/v1/trace'), # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
insecure: config_opt('OTEL_EXPORTER_OTLP_SPAN_INSECURE', 'OTEL_EXPORTER_OTLP_INSECURE', default: false),
certificate_file: config_opt('OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE', 'OTEL_EXPORTER_OTLP_CERTIFICATE'),
headers: config_opt('OTEL_EXPORTER_OTLP_SPAN_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS'), # TODO: what format is expected here?
@@ -43,12 +44,10 @@
uri = URI "http://#{endpoint}"
@http = Net::HTTP.new(uri.host, uri.port)
@http.use_ssl = insecure.to_s.downcase == 'false'
@http.ca_file = certificate_file unless certificate_file.nil?
@http.keep_alive_timeout = KEEP_ALIVE_TIMEOUT
- @http.open_timeout = OPEN_TIMEOUT
- @http.read_timeout = READ_TIMEOUT
@path = uri.path
@headers = case headers
when String then CSV.parse(headers, col_sep: ':', row_sep: ',').to_h
when Hash then headers
@@ -62,21 +61,24 @@
# Called to export sampled {OpenTelemetry::SDK::Trace::SpanData} structs.
#
# @param [Enumerable<OpenTelemetry::SDK::Trace::SpanData>] span_data the
# list of recorded {OpenTelemetry::SDK::Trace::SpanData} structs to be
# exported.
+ # @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] the result of the export.
- def export(span_data)
+ def export(span_data, timeout: nil)
return FAILURE if @shutdown
- send_bytes(encode(span_data))
+ send_bytes(encode(span_data), timeout: timeout)
end
# Called when {OpenTelemetry::SDK::Trace::Tracer#shutdown} is called, if
# this exporter is registered to a {OpenTelemetry::SDK::Trace::Tracer}
# object.
- def shutdown
+ #
+ # @param [optional Numeric] timeout An optional timeout in seconds.
+ def shutdown(timeout: nil)
@shutdown = true
@http.finish if @http.started?
end
private
@@ -106,12 +108,14 @@
uri.path.nil? || uri.path.empty?
rescue URI::InvalidURIError
true
end
- def send_bytes(bytes) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
+ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
retry_count = 0
+ timeout ||= @timeout
+ start_time = Time.now
untraced do # rubocop:disable Metrics/BlockLength
request = Net::HTTP::Post.new(@path)
request.body = if @compression == 'gzip'
request.add_field('Content-Encoding', 'gzip')
Zlib.gzip(bytes)
@@ -119,10 +123,16 @@
bytes
end
request.add_field('Content-Type', 'application/x-protobuf')
@headers&.each { |key, value| request.add_field(key, value) }
+ remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
+ return TIMEOUT if remaining_timeout.zero?
+
+ @http.open_timeout = remaining_timeout
+ @http.read_timeout = remaining_timeout
+ @http.write_timeout = remaining_timeout if WRITE_TIMEOUT_SUPPORTED
@http.start unless @http.started?
response = @http.request(request)
case response
when Net::HTTPOK
@@ -150,20 +160,25 @@
end
rescue Net::OpenTimeout, Net::ReadTimeout
retry if backoff?(retry_count: retry_count += 1)
return FAILURE
end
+ ensure
+ # Reset timeouts to defaults for the next call.
+ @http.open_timeout = @timeout
+ @http.read_timeout = @timeout
+ @http.write_timeout = @timeout if WRITE_TIMEOUT_SUPPORTED
end
def handle_redirect(location)
# TODO: figure out destination and reinitialize @http and @path
end
def untraced
OpenTelemetry::Trace.with_span(OpenTelemetry::Trace::Span.new) { yield }
end
- def backoff?(retry_after: nil, retry_count:, reason:)
+ def backoff?(retry_after: nil, retry_count:)
return false if retry_count > RETRY_COUNT
sleep_interval = nil
unless retry_after.nil?
sleep_interval =