lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.2 vs lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.3
- old
+ new
@@ -12,95 +12,84 @@
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'
- def call(context)
- span_name = get_span_name(context)
- attributes = get_span_attributes(context)
+ def call(context) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
+ return super unless context
- tracer.in_span(span_name, kind: get_span_kind(context), attributes: attributes) do |span|
- execute = proc {
- inject_context(context)
- super(context).tap do |response|
- if (err = response.error)
- span.record_exception(err)
- span.status = Trace::Status.error(err)
- end
- end
- }
+ service_name = context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1]
+ operation = context.operation&.name
+ client_method = "#{service_name}.#{operation}"
+ attributes = {
+ 'aws.region' => context.config.region,
+ OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
+ OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
+ OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name
+ }
+ attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB'
+ MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS'
+ MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS'
+ tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(service_name, operation)) do |span|
+ inject_context(context, client_method)
if instrumentation_config[:suppress_internal_instrumentation]
- OpenTelemetry::Common::Utilities.untraced(&execute)
+ OpenTelemetry::Common::Utilities.untraced { super }
else
- execute.call
+ super
+ end.tap do |response|
+ if (err = response.error)
+ span.record_exception(err)
+ span.status = Trace::Status.error(err)
+ end
end
end
end
- def inject_context(context)
- client_method = get_client_method(context)
- return unless [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].include? client_method
+ private
- context.params[:message_attributes] ||= {}
- OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
+ def tracer
+ AwsSdk::Instrumentation.instance.tracer
end
- def get_span_attributes(context)
- span_attributes = {
- 'aws.region' => context.config.region,
- OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
- OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => get_operation(context),
- OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => get_service_name(context)
- }
-
- messaging_attributes = MessagingHelper.get_messaging_attributes(context, get_service_name(context), get_operation(context))
- db_attributes = DbHelper.get_db_attributes(context, get_service_name(context), get_operation(context))
- span_attributes.merge(messaging_attributes).merge(db_attributes)
+ def instrumentation_config
+ AwsSdk::Instrumentation.instance.config
end
- def get_service_name(context)
- context&.client.class.api.metadata['serviceId'] || context&.client.class.to_s.split('::')[1]
- end
+ def inject_context(context, client_method)
+ return unless [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].include? client_method
- def get_operation(context)
- context&.operation&.name
+ if client_method == SQS_SEND_MESSAGE_BATCH
+ context.params[:entries].each do |entry|
+ entry[:message_attributes] ||= {}
+ OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
+ end
+ else
+ context.params[:message_attributes] ||= {}
+ OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
+ end
end
- def get_span_kind(context)
- client_method = get_client_method(context)
+ def span_kind(service_name, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
else
OpenTelemetry::Trace::SpanKind::CLIENT
end
end
- def get_span_name(context)
- client_method = get_client_method(context)
+ def span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
- "#{MessagingHelper.get_queue_name(context)} send"
+ "#{MessagingHelper.queue_name(context)} send"
when SQS_RECEIVE_MESSAGE
- "#{MessagingHelper.get_queue_name(context)} receive"
+ "#{MessagingHelper.queue_name(context)} receive"
else
client_method
end
- end
-
- def get_client_method(context)
- "#{get_service_name(context)}.#{get_operation(context)}"
- end
-
- def tracer
- AwsSdk::Instrumentation.instance.tracer
- end
-
- def instrumentation_config
- AwsSdk::Instrumentation.instance.config
end
end
# A Seahorse::Client::Plugin that enables instrumentation for all AWS services
class Plugin < Seahorse::Client::Plugin