# frozen_string_literal: true require 'thrift' module OpenTracing module Instrumentation module Thrift # TracedProtocol wrap any raw Thrift protocol instance. # Not thread safe! # # Usage (without multiplexed): # buffered_transport = ::Thrift::BufferedTransport.new(transport) # protocol = ::Thrift::BinaryProtocol.new(buffered_transport) # traced_protocol = \ # OpenTracing::Instrumentation::Thrift::TracedProtocol.new(protocol) # # Usage (multiplexed): # buffered_transport = ::Thrift::BufferedTransport.new(transport) # protocol = ::Thrift::BinaryProtocol.new(buffered_transport) # traced_protocol = # OpenTracing::Instrumentation::Thrift::TracedProtocol # .new(protocol) # multiplexed_protocol = # ::Thrift::MultiplexedProtocol # .new(traced_protocol, 'OrderService') class TracedProtocol < ::Thrift::BaseProtocol extend Forwardable include ::Thrift::ProtocolDecorator attr_reader :protocol def initialize(protocol, config: Config.new) @protocol = protocol @config = config end def write_message_begin(name, type, seqid) request_tags = build_request_tags(name, type) self.write_scope = tracer.start_active_span( write_operation_name, tags: request_tags, ) handler_error(write_scope.span, type) protocol.write_message_begin(name, type, seqid) end def write_message_end super write_scope.close end def read_message_begin self.read_scope = tracer.start_active_span(read_operation_name) name, type, rseqid = super tags = build_request_tags(name, type) write_span_tags(read_scope.span, tags) handler_error(read_scope.span, type) [name, type, rseqid] end def read_message_end super read_scope.close end private def_delegators :@config, :tracer, :write_operation_name, :read_operation_name attr_accessor :write_scope, :read_scope METHOD_PART = 'method' SERVICE_NAME_PART = 'service_name' NAME_PATTER = /((?\w+):)?(?\w+)/.freeze MESSAGE_TYPES = { ::Thrift::MessageTypes::CALL => 'CALL', ::Thrift::MessageTypes::REPLY => 'REPLY', ::Thrift::MessageTypes::EXCEPTION => 'EXCEPTION', ::Thrift::MessageTypes::ONEWAY => 'ONEWAY', }.freeze def parse_name(name) name_matches = NAME_PATTER.match(name) method = name_matches[METHOD_PART] service_name = name_matches[SERVICE_NAME_PART] [service_name, method] end def write_span_tags(span, tags) tags.each do |tag, value| span.set_tag(tag, value) end end def build_request_tags(name, type) service_name, method = parse_name(name) { 'thrift.method' => method, 'thrift.service_name' => service_name, 'thrift.type' => MESSAGE_TYPES[type], 'thrift.multiplexed' => !service_name.nil?, 'thrift.protocol' => protocol_name, 'thrift.transport' => transport_name, }.compact end def protocol_name protocol.class.to_s end def build_transport_name(transport) inner_transport = transport.instance_variable_get(:@transport) if inner_transport inner_transport_name = build_transport_name(inner_transport) "#{transport.class}(#{inner_transport_name})" else transport.class.to_s end end def transport_name return @transport_name if defined?(@transport_name) @transport_name = build_transport_name(@protocol.trans) end def handler_error(span, type) return if type != ::Thrift::MessageTypes::EXCEPTION span.set_tag('error', true) end end end end end