# frozen_string_literal: true module OpenTracing module Instrumentation module Thrift # TracedProtocol wrap any raw Thrift protocol instance. # Not thread safe! # # Usage (without multiplexed): # buffered_transport = ::Thrift::BufferedTransport.new(transport) # protocol = ::Thrift::BinaryProtocol.new(buffered_transport) # traced_protocol = \ # OpenTracing::Instrumentation::Thrift::TracedProtocol.new(protocol) # # Usage (multiplexed): # buffered_transport = ::Thrift::BufferedTransport.new(transport) # protocol = ::Thrift::BinaryProtocol.new(buffered_transport) # traced_protocol = # OpenTracing::Instrumentation::Thrift::TracedProtocol # .new(protocol) # multiplexed_protocol = # ::Thrift::MultiplexedProtocol # .new(traced_protocol, 'OrderService') class TracedProtocol < ::Thrift::BaseProtocol extend Forwardable include ::Thrift::ProtocolDecorator WRITE_DIRECTION = 'write' READ_DIRECTION = 'read' def initialize(protocol, config: TracedProtocolConfig.new) super(protocol) @config = config.dup yield @config if block_given? @protocol_tags = config.tags_builder.build_protocol_tags(protocol) end def ==(other) protocol == other.protocol && config == other.config && protocol_tags == other.protocol_tags end def write_message_begin(name, type, seqid) self.write_span = \ safe_start_span(WRITE_DIRECTION, name, type) # Call protocol instaed super, beacaus ProtocolDecorator do not # pass arguments to protocol.write_message_begin protocol.write_message_begin(name, type, seqid) rescue StandardError => e write_error(write_span, e) safe_close_span(write_span) raise e end def write_message_end super ensure safe_close_span(write_span) end def read_message_begin start_time = Time.now.utc name, type, rseqid = super self.read_span = \ safe_start_span(READ_DIRECTION, name, type, start_time: start_time) [name, type, rseqid] end def read_message_end super ensure safe_close_span(read_span) end protected attr_reader :protocol_tags, :protocol, :config private def_delegators :config, :tracer, :tags_builder, :operation_name_builder, :error_writer, :logger attr_accessor :write_span, :read_span def safe_start_span( direction, name, type, start_time: Time.now.utc ) operation_name = build_operation_name(direction, name, type) request_tags = build_tags(name, type) tracer.start_span( operation_name, tags: request_tags, start_time: start_time, ) rescue StandardError => e logger&.error(e) end def safe_close_span(span) return if span.nil? span.finish rescue StandardError => e logger&.error(e) end def write_error(span, exception) return if span.nil? error_writer.write_error(span, exception) rescue StandardError => e logger&.error(e) end def build_operation_name(direction, name, type) operation_name_builder.build_operation_name( direction, name, type, ) end def build_tags(name, type) protocol_tags .merge(tags_builder.build_message_tags(name, type)) .compact end end end end end