lib/opentracing/instrumentation/thrift/traced_protocol.rb in opentracing-instrumentation-0.1.7 vs lib/opentracing/instrumentation/thrift/traced_protocol.rb in opentracing-instrumentation-0.1.8

- old
+ new

@@ -1,9 +1,7 @@ # frozen_string_literal: true -require 'thrift' - module OpenTracing module Instrumentation module Thrift # TracedProtocol wrap any raw Thrift protocol instance. # Not thread safe! @@ -26,119 +24,117 @@ class TracedProtocol < ::Thrift::BaseProtocol extend Forwardable include ::Thrift::ProtocolDecorator - attr_reader :protocol + WRITE_DIRECTION = 'write' + READ_DIRECTION = 'read' - def initialize(protocol, config: Config.new) - @protocol = protocol - @config = config + def initialize(protocol, config: TracedProtocolConfig.new) + super(protocol) + @config = config.dup + yield @config if block_given? + @static_tags = config.tags_builder.build_static_tags(protocol) end def write_message_begin(name, type, seqid) - request_tags = build_request_tags(name, type) - self.write_scope = tracer.start_active_span( - write_operation_name, - tags: request_tags, - ) - handler_error(write_scope.span, type) + self.write_scope = \ + safe_start_active_span(WRITE_DIRECTION, name, type) + # Call protocol instaed super, beacaus ProtocolDecorator do not + # pass arguments to protocol.write_message_begin protocol.write_message_begin(name, type, seqid) + rescue StandardError => e + write_error(write_scope&.span, e) + safe_close_span(write_scope) + raise e end def write_message_end super - write_scope.close + ensure + safe_close_span(write_scope) end def read_message_begin - self.read_scope = tracer.start_active_span(read_operation_name) + start_time = Time.now.utc name, type, rseqid = super - tags = build_request_tags(name, type) - write_span_tags(read_scope.span, tags) - handler_error(read_scope.span, type) + self.read_scope = \ + safe_start_active_span(READ_DIRECTION, name, type, + start_time: start_time) [name, type, rseqid] end def read_message_end super - read_scope.close + ensure + safe_close_span(read_scope) end private + attr_reader :static_tags + attr_reader :protocol + def_delegators :@config, :tracer, - :write_operation_name, - :read_operation_name + :tags_builder, + :operation_name_builder, + :error_writer, + :logger attr_accessor :write_scope, :read_scope - METHOD_PART = 'method' - SERVICE_NAME_PART = 'service_name' - NAME_PATTER = /((?<service_name>\w+):)?(?<method>\w+)/.freeze - MESSAGE_TYPES = { - ::Thrift::MessageTypes::CALL => 'CALL', - ::Thrift::MessageTypes::REPLY => 'REPLY', - ::Thrift::MessageTypes::EXCEPTION => 'EXCEPTION', - ::Thrift::MessageTypes::ONEWAY => 'ONEWAY', - }.freeze + def safe_start_active_span( + direction, + name, + type, + start_time: Time.now.utc + ) + operation_name = build_operation_name(direction, name, type) + request_tags = build_tags(name, type) - def parse_name(name) - name_matches = NAME_PATTER.match(name) - method = name_matches[METHOD_PART] - service_name = name_matches[SERVICE_NAME_PART] - [service_name, method] + tracer.start_active_span( + operation_name, + tags: request_tags, + start_time: start_time, + ) + rescue StandardError => e + logger&.error(e) end - def write_span_tags(span, tags) - tags.each do |tag, value| - span.set_tag(tag, value) - end - end + def safe_close_span(scope) + return if scope.nil? - def build_request_tags(name, type) - service_name, method = parse_name(name) - { - 'thrift.method' => method, - 'thrift.service_name' => service_name, - 'thrift.type' => MESSAGE_TYPES[type], - 'thrift.multiplexed' => !service_name.nil?, - 'thrift.protocol' => protocol_name, - 'thrift.transport' => transport_name, - }.compact + scope.close + rescue StandardError => e + logger&.error(e) end - def protocol_name - protocol.class.to_s - end + def write_error(span, exception) + return if span.nil? - def build_transport_name(transport) - inner_transport = transport.instance_variable_get(:@transport) - - if inner_transport - inner_transport_name = build_transport_name(inner_transport) - "#{transport.class}(#{inner_transport_name})" - else - transport.class.to_s - end + error_writer.write_error(span, exception) + rescue StandardError => e + logger&.error(e) end - def transport_name - return @transport_name if defined?(@transport_name) - - @transport_name = build_transport_name(@protocol.trans) + def build_operation_name(direction, name, type) + operation_name_builder.build_operation_name( + direction, + name, + type, + ) end - def handler_error(span, type) - return if type != ::Thrift::MessageTypes::EXCEPTION - - span.set_tag('error', true) + def build_tags(name, type) + static_tags + .merge(tags_builder.build_message_tags(name, type)) + .compact end end end end end