# frozen_string_literal: true require 'json' require 'opentracing/instrumentation/mongo/query_sanitazer' module OpenTracing module Instrumentation module Mongo # TraceSubscriber trace mongo requests class TraceSubscriber include Forwardable DEFAULT_OPERATION_NAME_PATTERN = \ 'mongo(collection=%s.%s, command=%s)' def initialize( tracer: OpenTracing.global_tracer, scope_store: {}, operation_name_pattern: DEFAULT_OPERATION_NAME_PATTERN, sanitazer: QuerySanitazer.new ) @tracer = tracer @monitor = Monitor.new @scope_store = scope_store @operation_name_pattern = operation_name_pattern @sanitazer = sanitazer end def started(event) scope = tracer.start_active_span( build_operation_name(event), tags: base_tags(event).merge(mongo_tags(event)), ) store_scope(event.operation_id, scope) end def succeeded(event) clean_scope(event.operation_id) end def failed(event) tag_error(event.operation_id, event.message, event.failure) clean_scope(event.operation_id) end private attr_reader :tracer attr_reader :scope_store attr_reader :monitor attr_reader :operation_name_pattern attr_reader :sanitazer def build_operation_name(event) command_name = event.command_name collection_name = event.command[command_name] format( operation_name_pattern, database: event.database_name, collection: collection_name, command: command_name, ) end def base_tags(event) { 'component' => 'db', 'db.type' => 'mongo', 'db.instance' => event.address.to_s, } end def mongo_tags(event) collection_name = event.command[event.command_name] command_args = sanitazer.sanitaze(event.command, event.command_name) { 'mongo.request_id' => event.request_id, 'mongo.operation_id' => event.operation_id, 'mongo.database_name' => event.database_name, 'mongo.collection_name' => collection_name, 'mongo.command_name' => event.command_name, 'mongo.command_args' => JSON.dump(command_args), } end def store_scope(operation_id, scope) monitor.synchronize do scope_store[operation_id] = scope end end def clean_scope(operation_id) monitor.synchronize do scope_store .delete(operation_id) .close end end ERROR_TAG = 'error' def tag_error(operation_id, message, object) monitor.synchronize do scope_store[operation_id] .span .set_tag(ERROR_TAG, true) .log_kv( 'error.kind': message, 'error.object': JSON.dump(object), ) end end end end end end