lib/opentelemetry/instrumentation/aws_sdk/handler.rb in opentelemetry-instrumentation-aws_sdk-0.1.0 vs lib/opentelemetry/instrumentation/aws_sdk/handler.rb in opentelemetry-instrumentation-aws_sdk-0.2.0

- old
+ new

@@ -7,32 +7,41 @@ module OpenTelemetry module Instrumentation module AwsSdk # Generates Spans for all interactions with AwsSdk class Handler < Seahorse::Client::Handler - def call(context) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + SQS_SEND_MESSAGE = 'SQS.SendMessage' + SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' + SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' + SNS_PUBLISH = 'SNS.Publish' + + def call(context) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity return super unless context - service_name = context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1] + service_name = service_name(context) operation = context.operation&.name + client_method = "#{service_name}.#{operation}" attributes = { 'aws.region' => context.config.region, OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name } attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB' + MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS' + MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS' - tracer.in_span("#{service_name}.#{operation}", attributes: attributes, kind: OpenTelemetry::Trace::SpanKind::CLIENT) do |span| + tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span| + inject_context(context, client_method) if instrumentation_config[:suppress_internal_instrumentation] OpenTelemetry::Common::Utilities.untraced { super } else super end.tap do |response| if (err = response.error) span.record_exception(err) - span.status = Trace::Status.error(err) + span.status = Trace::Status.error(err.to_s) end end end end @@ -42,9 +51,54 @@ AwsSdk::Instrumentation.instance.tracer end def instrumentation_config AwsSdk::Instrumentation.instance.config + end + + def service_name(context) # rubocop:disable Metrics/AbcSize + # Support aws-sdk v2.0.x, which 'metadata' has a setter method only + return context.client.class.to_s.split('::')[1] if ::Seahorse::Model::Api.instance_method(:metadata).parameters.length.positive? + + context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1] + end + + SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze + def inject_context(context, client_method) + return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method + return unless instrumentation_config[:inject_messaging_context] + + if client_method == SQS_SEND_MESSAGE_BATCH + context.params[:entries].each do |entry| + entry[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) + end + else + context.params[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + end + end + + def span_kind(client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + OpenTelemetry::Trace::SpanKind::PRODUCER + when SQS_RECEIVE_MESSAGE + OpenTelemetry::Trace::SpanKind::CONSUMER + else + OpenTelemetry::Trace::SpanKind::CLIENT + end + end + + def span_name(context, client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + "#{MessagingHelper.queue_name(context)} send" + when SQS_RECEIVE_MESSAGE + "#{MessagingHelper.queue_name(context)} receive" + else + client_method + end end end # A Seahorse::Client::Plugin that enables instrumentation for all AWS services class Plugin < Seahorse::Client::Plugin