# frozen_string_literal: true require 'json' require 'opentracing/instrumentation/mongo/query_sanitazer' module OpenTracing module Instrumentation module Mongo # TraceSubscriber trace mongo requests class TraceSubscriber include Forwardable def initialize( tracer: OpenTracing.global_tracer, scope_store: {}, operation_name_builder: OperationNameBuilder.new, sanitazer: QuerySanitazer.new ) @tracer = tracer @monitor = Monitor.new @scope_store = scope_store @operation_name_builder = operation_name_builder @sanitazer = sanitazer end def started(event) scope = tracer.start_active_span( build_operation_name(event), tags: base_tags(event).merge(mongo_tags(event)), ) store_scope(event.operation_id, scope) end def succeeded(event) clean_scope(event.operation_id) end def failed(event) tag_error(event.operation_id, event.message, event.failure) clean_scope(event.operation_id) end private attr_reader :tracer, :scope_store, :monitor, :operation_name_builder, :sanitazer def build_operation_name(event) operation_name_builder.build_operation_name(event) end def base_tags(event) { 'component' => 'db', 'db.type' => 'mongo', 'db.instance' => event.address.to_s, } end def mongo_tags(event) collection_name = event.command[event.command_name] command_args = sanitazer.sanitaze(event.command, event.command_name) { 'mongo.request_id' => event.request_id, 'mongo.operation_id' => event.operation_id, 'mongo.database_name' => event.database_name, 'mongo.collection_name' => collection_name, 'mongo.command_name' => event.command_name, 'mongo.command_args' => JSON.dump(command_args), } end def store_scope(operation_id, scope) monitor.synchronize do scope_store[operation_id] = scope end end def clean_scope(operation_id) monitor.synchronize do scope_store .delete(operation_id) .close end end ERROR_TAG = 'error' def tag_error(operation_id, message, object) monitor.synchronize do scope_store[operation_id] .span .set_tag(ERROR_TAG, true) .log_kv( 'error.kind': message, 'error.object': JSON.dump(object), ) end end end end end end