lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.0 vs lib/opentelemetry/instrumentation/aws_sdk/handler.rb in aspecto-opentelemetry-instrumentation-aws_sdk-0.1.1
- old
+ new
@@ -7,16 +7,22 @@
module OpenTelemetry
module Instrumentation
module AwsSdk
# Generates Spans for all interactions with AwsSdk
class Handler < Seahorse::Client::Handler
+ SQS_SEND_MESSAGE = 'SQS.SendMessage'
+ SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
+ SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
+ SNS_PUBLISH = 'SNS.Publish'
+
def call(context)
span_name = get_span_name(context)
attributes = get_span_attributes(context)
- tracer.in_span(span_name, kind: OpenTelemetry::Trace::SpanKind::CLIENT, attributes: attributes) do |span|
+ tracer.in_span(span_name, kind: get_span_kind(context), attributes: attributes) do |span|
execute = proc {
+ inject_context(context)
super(context).tap do |response|
if (err = response.error)
span.record_exception(err)
span.status = Trace::Status.error(err)
end
@@ -29,30 +35,63 @@
execute.call
end
end
end
+ def inject_context(context)
+ client_method = get_client_method(context)
+ return unless [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].include? client_method
+
+ context.params[:message_attributes] ||= {}
+ OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
+ end
+
def get_span_attributes(context)
span_attributes = {
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => get_operation(context),
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => get_service_name(context)
}
+ messaging_attributes = MessagingHelper.get_messaging_attributes(context, get_service_name(context), get_operation(context))
db_attributes = DbHelper.get_db_attributes(context, get_service_name(context), get_operation(context))
- span_attributes.merge(db_attributes)
+ span_attributes.merge(messaging_attributes).merge(db_attributes)
end
def get_service_name(context)
context&.client.class.api.metadata['serviceId'] || context&.client.class.to_s.split('::')[1]
end
def get_operation(context)
context&.operation&.name
end
+ def get_span_kind(context)
+ client_method = get_client_method(context)
+ case client_method
+ when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
+ OpenTelemetry::Trace::SpanKind::PRODUCER
+ when SQS_RECEIVE_MESSAGE
+ OpenTelemetry::Trace::SpanKind::CONSUMER
+ else
+ OpenTelemetry::Trace::SpanKind::CLIENT
+ end
+ end
+
def get_span_name(context)
+ client_method = get_client_method(context)
+ case client_method
+ when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
+ "#{MessagingHelper.get_queue_name(context)} send"
+ when SQS_RECEIVE_MESSAGE
+ "#{MessagingHelper.get_queue_name(context)} receive"
+ else
+ client_method
+ end
+ end
+
+ def get_client_method(context)
"#{get_service_name(context)}.#{get_operation(context)}"
end
def tracer
AwsSdk::Instrumentation.instance.tracer