lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.2 vs lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.3

- old
+ new

@@ -12,95 +12,84 @@ SQS_SEND_MESSAGE = 'SQS.SendMessage' SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' SNS_PUBLISH = 'SNS.Publish' - def call(context) - span_name = get_span_name(context) - attributes = get_span_attributes(context) + def call(context) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + return super unless context - tracer.in_span(span_name, kind: get_span_kind(context), attributes: attributes) do |span| - execute = proc { - inject_context(context) - super(context).tap do |response| - if (err = response.error) - span.record_exception(err) - span.status = Trace::Status.error(err) - end - end - } + service_name = context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1] + operation = context.operation&.name + client_method = "#{service_name}.#{operation}" + attributes = { + 'aws.region' => context.config.region, + OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', + OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, + OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name + } + attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB' + MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS' + MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS' + tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(service_name, operation)) do |span| + inject_context(context, client_method) if instrumentation_config[:suppress_internal_instrumentation] - OpenTelemetry::Common::Utilities.untraced(&execute) + OpenTelemetry::Common::Utilities.untraced { super } else - execute.call + super + end.tap do |response| + if (err = response.error) + span.record_exception(err) + span.status = Trace::Status.error(err) + end end end end - def inject_context(context) - client_method = get_client_method(context) - return unless [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].include? client_method + private - context.params[:message_attributes] ||= {} - OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + def tracer + AwsSdk::Instrumentation.instance.tracer end - def get_span_attributes(context) - span_attributes = { - 'aws.region' => context.config.region, - OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', - OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => get_operation(context), - OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => get_service_name(context) - } - - messaging_attributes = MessagingHelper.get_messaging_attributes(context, get_service_name(context), get_operation(context)) - db_attributes = DbHelper.get_db_attributes(context, get_service_name(context), get_operation(context)) - span_attributes.merge(messaging_attributes).merge(db_attributes) + def instrumentation_config + AwsSdk::Instrumentation.instance.config end - def get_service_name(context) - context&.client.class.api.metadata['serviceId'] || context&.client.class.to_s.split('::')[1] - end + def inject_context(context, client_method) + return unless [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].include? client_method - def get_operation(context) - context&.operation&.name + if client_method == SQS_SEND_MESSAGE_BATCH + context.params[:entries].each do |entry| + entry[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) + end + else + context.params[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + end end - def get_span_kind(context) - client_method = get_client_method(context) + def span_kind(service_name, client_method) case client_method when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH OpenTelemetry::Trace::SpanKind::PRODUCER when SQS_RECEIVE_MESSAGE OpenTelemetry::Trace::SpanKind::CONSUMER else OpenTelemetry::Trace::SpanKind::CLIENT end end - def get_span_name(context) - client_method = get_client_method(context) + def span_name(context, client_method) case client_method when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH - "#{MessagingHelper.get_queue_name(context)} send" + "#{MessagingHelper.queue_name(context)} send" when SQS_RECEIVE_MESSAGE - "#{MessagingHelper.get_queue_name(context)} receive" + "#{MessagingHelper.queue_name(context)} receive" else client_method end - end - - def get_client_method(context) - "#{get_service_name(context)}.#{get_operation(context)}" - end - - def tracer - AwsSdk::Instrumentation.instance.tracer - end - - def instrumentation_config - AwsSdk::Instrumentation.instance.config end end # A Seahorse::Client::Plugin that enables instrumentation for all AWS services class Plugin < Seahorse::Client::Plugin