Sha256: 54250b09435131235c06f10ae052870d7eb1ac4a60f47d12d8154b8cee241304

Contents?: true

Size: 1.58 KB

Versions: 5

Compression:

Stored size: 1.58 KB

Contents

# frozen_string_literal: true

require 'thread'

require_relative './collector/timestamp'
require_relative './collector/log_annotations'

module Zipkin
  class Collector
    OT_KIND_TO_ZIPKIN_KIND = {
      'server' => 'SERVER',
      'client' => 'CLIENT',
      'producer' => 'PRODUCER',
      'consumer' => 'CONSUMER'
    }.freeze

    def initialize(local_endpoint)
      @buffer = Buffer.new
      @local_endpoint = local_endpoint
    end

    def retrieve
      @buffer.retrieve
    end

    def send_span(span, end_time)
      finish_ts = Timestamp.create(end_time)
      start_ts = Timestamp.create(span.start_time)
      duration = finish_ts - start_ts
      return unless span.context.sampled?

      @buffer << {
        traceId: span.context.trace_id,
        id: span.context.span_id,
        parentId: span.context.parent_id,
        name: span.operation_name,
        kind: OT_KIND_TO_ZIPKIN_KIND[span.tags[:'span.kind'] || 'server'],
        timestamp: start_ts,
        duration: duration,
        debug: false,
        shared: false,
        localEndpoint: @local_endpoint,
        remoteEndpoint: Endpoint.remote_endpoint(span),
        annotations: LogAnnotations.build(span),
        tags: span.tags
      }
    end

    class Buffer
      def initialize
        @buffer = []
        @mutex = Mutex.new
      end

      def <<(element)
        @mutex.synchronize do
          @buffer << element
          true
        end
      end

      def retrieve
        @mutex.synchronize do
          elements = @buffer.dup
          @buffer.clear
          elements
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
zipkin-1.6.0 lib/zipkin/collector.rb
zipkin-1.5.2 lib/zipkin/collector.rb
zipkin-1.5.1 lib/zipkin/collector.rb
zipkin-1.5.0 lib/zipkin/collector.rb
zipkin-1.4.0 lib/zipkin/collector.rb