lib/opentracing/instrumentation/thrift/traced_protocol.rb in opentracing-instrumentation-0.1.7 vs lib/opentracing/instrumentation/thrift/traced_protocol.rb in opentracing-instrumentation-0.1.8
- old
+ new
@@ -1,9 +1,7 @@
# frozen_string_literal: true
-require 'thrift'
-
module OpenTracing
module Instrumentation
module Thrift
# TracedProtocol wrap any raw Thrift protocol instance.
# Not thread safe!
@@ -26,119 +24,117 @@
class TracedProtocol < ::Thrift::BaseProtocol
extend Forwardable
include ::Thrift::ProtocolDecorator
- attr_reader :protocol
+ WRITE_DIRECTION = 'write'
+ READ_DIRECTION = 'read'
- def initialize(protocol, config: Config.new)
- @protocol = protocol
- @config = config
+ def initialize(protocol, config: TracedProtocolConfig.new)
+ super(protocol)
+ @config = config.dup
+ yield @config if block_given?
+ @static_tags = config.tags_builder.build_static_tags(protocol)
end
def write_message_begin(name, type, seqid)
- request_tags = build_request_tags(name, type)
- self.write_scope = tracer.start_active_span(
- write_operation_name,
- tags: request_tags,
- )
- handler_error(write_scope.span, type)
+ self.write_scope = \
+ safe_start_active_span(WRITE_DIRECTION, name, type)
+ # Call protocol instaed super, beacaus ProtocolDecorator do not
+ # pass arguments to protocol.write_message_begin
protocol.write_message_begin(name, type, seqid)
+ rescue StandardError => e
+ write_error(write_scope&.span, e)
+ safe_close_span(write_scope)
+ raise e
end
def write_message_end
super
- write_scope.close
+ ensure
+ safe_close_span(write_scope)
end
def read_message_begin
- self.read_scope = tracer.start_active_span(read_operation_name)
+ start_time = Time.now.utc
name, type, rseqid = super
- tags = build_request_tags(name, type)
- write_span_tags(read_scope.span, tags)
- handler_error(read_scope.span, type)
+ self.read_scope = \
+ safe_start_active_span(READ_DIRECTION, name, type,
+ start_time: start_time)
[name, type, rseqid]
end
def read_message_end
super
- read_scope.close
+ ensure
+ safe_close_span(read_scope)
end
private
+ attr_reader :static_tags
+ attr_reader :protocol
+
def_delegators :@config,
:tracer,
- :write_operation_name,
- :read_operation_name
+ :tags_builder,
+ :operation_name_builder,
+ :error_writer,
+ :logger
attr_accessor :write_scope,
:read_scope
- METHOD_PART = 'method'
- SERVICE_NAME_PART = 'service_name'
- NAME_PATTER = /((?<service_name>\w+):)?(?<method>\w+)/.freeze
- MESSAGE_TYPES = {
- ::Thrift::MessageTypes::CALL => 'CALL',
- ::Thrift::MessageTypes::REPLY => 'REPLY',
- ::Thrift::MessageTypes::EXCEPTION => 'EXCEPTION',
- ::Thrift::MessageTypes::ONEWAY => 'ONEWAY',
- }.freeze
+ def safe_start_active_span(
+ direction,
+ name,
+ type,
+ start_time: Time.now.utc
+ )
+ operation_name = build_operation_name(direction, name, type)
+ request_tags = build_tags(name, type)
- def parse_name(name)
- name_matches = NAME_PATTER.match(name)
- method = name_matches[METHOD_PART]
- service_name = name_matches[SERVICE_NAME_PART]
- [service_name, method]
+ tracer.start_active_span(
+ operation_name,
+ tags: request_tags,
+ start_time: start_time,
+ )
+ rescue StandardError => e
+ logger&.error(e)
end
- def write_span_tags(span, tags)
- tags.each do |tag, value|
- span.set_tag(tag, value)
- end
- end
+ def safe_close_span(scope)
+ return if scope.nil?
- def build_request_tags(name, type)
- service_name, method = parse_name(name)
- {
- 'thrift.method' => method,
- 'thrift.service_name' => service_name,
- 'thrift.type' => MESSAGE_TYPES[type],
- 'thrift.multiplexed' => !service_name.nil?,
- 'thrift.protocol' => protocol_name,
- 'thrift.transport' => transport_name,
- }.compact
+ scope.close
+ rescue StandardError => e
+ logger&.error(e)
end
- def protocol_name
- protocol.class.to_s
- end
+ def write_error(span, exception)
+ return if span.nil?
- def build_transport_name(transport)
- inner_transport = transport.instance_variable_get(:@transport)
-
- if inner_transport
- inner_transport_name = build_transport_name(inner_transport)
- "#{transport.class}(#{inner_transport_name})"
- else
- transport.class.to_s
- end
+ error_writer.write_error(span, exception)
+ rescue StandardError => e
+ logger&.error(e)
end
- def transport_name
- return @transport_name if defined?(@transport_name)
-
- @transport_name = build_transport_name(@protocol.trans)
+ def build_operation_name(direction, name, type)
+ operation_name_builder.build_operation_name(
+ direction,
+ name,
+ type,
+ )
end
- def handler_error(span, type)
- return if type != ::Thrift::MessageTypes::EXCEPTION
-
- span.set_tag('error', true)
+ def build_tags(name, type)
+ static_tags
+ .merge(tags_builder.build_message_tags(name, type))
+ .compact
end
end
end
end
end