lib/ddtrace/transport/traces.rb in ddtrace-0.34.2 vs lib/ddtrace/transport/traces.rb in ddtrace-0.35.0
- old
+ new
@@ -1,33 +1,183 @@
require 'ddtrace/transport/parcel'
require 'ddtrace/transport/request'
+require 'ddtrace/chunker'
module Datadog
module Transport
module Traces
- # Data transfer object for trace data
- class Parcel
+ # Data transfer object for encoded traces
+ class EncodedParcel
include Transport::Parcel
+ attr_reader :trace_count
+
+ def initialize(data, trace_count)
+ super(data)
+ @trace_count = trace_count
+ end
+
def count
data.length
end
-
- def encode_with(encoder)
- encoder.encode_traces(data)
- end
end
# Traces request
class Request < Transport::Request
- def initialize(traces)
- super(Parcel.new(traces))
- end
end
# Traces response
module Response
- attr_reader :service_rates
+ attr_reader :service_rates, :trace_count
+ end
+
+ # Traces chunker
+ class Chunker
+ # Trace agent limit payload size of 10 MiB (since agent v5.11.0):
+ # https://github.com/DataDog/datadog-agent/blob/6.14.1/pkg/trace/api/api.go#L46
+ #
+ # We set the value to a conservative 5 MiB, in case network speed is slow.
+ DEFAULT_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024
+
+ attr_reader :encoder, :max_size
+
+ #
+ # Single traces larger than +max_size+ will be discarded.
+ #
+ # @param encoder [Datadog::Encoding::Encoder]
+ # @param max_size [String] maximum acceptable payload size
+ def initialize(encoder, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
+ @encoder = encoder
+ @max_size = max_size
+ end
+
+ # Encodes a list of traces in chunks.
+ # Before serializing, all traces are normalized. Trace nesting is not changed.
+ #
+ # @param traces [Enumerable<Trace>] list of traces
+ # @return [Enumerable[Array[Bytes,Integer]]] list of encoded chunks: each containing a byte array and
+ # number of traces
+ def encode_in_chunks(traces)
+ encoded_traces = traces.map { |t| encode_one(t) }.reject(&:nil?)
+
+ Datadog::Chunker.chunk_by_size(encoded_traces, max_size).map do |chunk|
+ [encoder.join(chunk), chunk.size]
+ end
+ end
+
+ private
+
+ def encode_one(trace)
+ encoded = Encoder.encode_trace(encoder, trace)
+
+ if encoded.size > max_size
+ # This single trace is too large, we can't flush it
+ Datadog.logger.debug { "Dropping trace. Payload too large: '#{trace.map(&:to_hash)}'" }
+ Datadog.health_metrics.transport_trace_too_large(1)
+
+ return nil
+ end
+
+ encoded
+ end
+ end
+
+ # Encodes traces using {Datadog::Encoding::Encoder} instances.
+ module Encoder
+ module_function
+
+ def encode_trace(encoder, trace)
+ encoder.encode(trace.map(&:to_hash))
+ end
+ end
+
+ # Sends traces based on transport API configuration.
+ #
+ # This class initializes the HTTP client, breaks down large
+ # batches of traces into smaller chunks and handles
+ # API version downgrade handshake.
+ class Transport
+ attr_reader :client, :apis, :default_api, :current_api_id
+
+ def initialize(apis, default_api)
+ @apis = apis
+ @default_api = default_api
+
+ change_api!(default_api)
+ end
+
+ def send_traces(traces)
+ encoder = current_api.encoder
+ chunker = Datadog::Transport::Traces::Chunker.new(encoder)
+
+ responses = chunker.encode_in_chunks(traces.lazy).map do |encoded_traces, trace_count|
+ request = Request.new(EncodedParcel.new(encoded_traces, trace_count))
+
+ client.send_payload(request).tap do |response|
+ if downgrade?(response)
+ downgrade!
+ return send_traces(traces)
+ end
+ end
+ end.force
+
+ Datadog.health_metrics.transport_chunked(responses.size)
+
+ responses
+ end
+
+ def stats
+ @client.stats
+ end
+
+ def current_api
+ apis[@current_api_id]
+ end
+
+ private
+
+ def downgrade?(response)
+ return false unless apis.fallbacks.key?(@current_api_id)
+ response.not_found? || response.unsupported?
+ end
+
+ def downgrade!
+ downgrade_api_id = apis.fallbacks[@current_api_id]
+ raise NoDowngradeAvailableError, @current_api_id if downgrade_api_id.nil?
+ change_api!(downgrade_api_id)
+ end
+
+ def change_api!(api_id)
+ raise UnknownApiVersionError, api_id unless apis.key?(api_id)
+ @current_api_id = api_id
+ @client = HTTP::Client.new(current_api)
+ end
+
+ # Raised when configured with an unknown API version
+ class UnknownApiVersionError < StandardError
+ attr_reader :version
+
+ def initialize(version)
+ @version = version
+ end
+
+ def message
+ "No matching transport API for version #{version}!"
+ end
+ end
+
+ # Raised when configured with an unknown API version
+ class NoDowngradeAvailableError < StandardError
+ attr_reader :version
+
+ def initialize(version)
+ @version = version
+ end
+
+ def message
+ "No downgrade from transport API version #{version} is available!"
+ end
+ end
end
end
end
end