lib/jaeger/client/udp_sender.rb in jaeger-client-0.1.0 vs lib/jaeger/client/udp_sender.rb in jaeger-client-0.2.0
- old
+ new
@@ -1,130 +1,50 @@
+require_relative './udp_sender/transport'
require 'jaeger/thrift/agent'
require 'thread'
module Jaeger
module Client
class UdpSender
- def initialize(service_name, host, port)
+ def initialize(service_name:, host:, port:, collector:, flush_interval:)
@service_name = service_name
+ @collector = collector
+ @flush_interval = flush_interval
- emitter = SocketEmitter.new(host, port)
- emitter.start
- transport = Transport.new(emitter)
+ transport = Transport.new(host, port)
protocol = ::Thrift::CompactProtocol.new(transport)
-
@client = Jaeger::Thrift::Agent::Client.new(protocol)
end
- def send_span(span, end_time)
- context = span.context
- start_ts, duration = build_timestamps(span, end_time)
+ def start
+ # Sending spans in a separate thread to avoid blocking the main thread.
+ @thread = Thread.new do
+ loop do
+ emit_batch(@collector.retrieve)
+ sleep @flush_interval
+ end
+ end
+ end
- thrift_span = Jaeger::Thrift::Span.new(
- 'traceIdLow' => context.trace_id,
- 'traceIdHigh' => context.trace_id,
- 'spanId' => context.span_id,
- 'parentSpanId' => context.parent_id || 0,
- 'operationName' => span.operation_name,
- 'references' => [],
- 'flags' => context.flags,
- 'startTime' => start_ts,
- 'duration' => duration,
- 'tags' => build_tags(span.tags),
- 'logs' => build_logs(span.logs)
- )
+ def stop
+ @thread.terminate if @thread
+ emit_batch(@collector.retrieve)
+ end
+
+ private
+
+ def emit_batch(thrift_spans)
+ return if thrift_spans.empty?
+
batch = Jaeger::Thrift::Batch.new(
'process' => Jaeger::Thrift::Process.new(
'serviceName' => @service_name,
'tags' => [],
),
- 'spans' => [thrift_span]
+ 'spans' => thrift_spans
)
@client.emitBatch(batch)
- end
-
- private
-
- def build_tags(tags)
- tags.map {|name, value| build_tag(name, value)}
- end
-
- def build_logs(logs)
- logs.map do |timestamp:, fields:|
- Jaeger::Thrift::Log.new(
- 'timestamp' => (timestamp.to_f * 1_000_000).to_i,
- 'fields' => fields.map {|name, value| build_tag(name, value)}
- )
- end
- end
-
- def build_tag(name, value)
- Jaeger::Thrift::Tag.new(
- 'key' => name.to_s,
- 'vType' => Jaeger::Thrift::TagType::STRING,
- 'vStr' => value.to_s
- )
- end
-
- def build_timestamps(span, end_time)
- start_ts = (span.start_time.to_f * 1_000_000).to_i
- end_ts = (end_time.to_f * 1_000_000).to_i
- duration = end_ts - start_ts
- [start_ts, duration]
- end
-
- class Transport
- def initialize(emitter)
- @emitter = emitter
- @buffer = ::Thrift::MemoryBufferTransport.new
- end
-
- def write(str)
- @buffer.write(str)
- end
-
- def flush
- @emitter.emit(@buffer.read(@buffer.available))
- @buffer.reset_buffer
- end
-
- def open; end
- def close; end
- end
-
- class SocketEmitter
- FLAGS = 0
-
- def initialize(host, port)
- @socket = UDPSocket.new
- @socket.connect(host, port)
- @encoded_spans = Queue.new
- end
-
- def emit(encoded_spans)
- @encoded_spans << encoded_spans
- end
-
- def start
- # Sending spans in a separate thread to avoid blocking the main thread.
- Thread.new do
- while encoded_span = @encoded_spans.pop
- send_bytes(encoded_span)
- end
- end
- end
-
- private
-
- def send_bytes(bytes)
- @socket.send(bytes, FLAGS)
- @socket.flush
- rescue Errno::ECONNREFUSED
- warn 'Unable to connect to Jaeger Agent'
- rescue => e
- warn "Unable to send spans: #{e.message}"
- end
end
end
end
end