# typed: ignore # Copyright (c) 2015 Sqreen. All Rights Reserved. # Please refer to our terms for more information: https://www.sqreen.com/terms.html require 'sqreen/ecosystem/module_api' require 'sqreen/ecosystem/module_api/instrumentation' require 'sqreen/ecosystem/module_api/message_producer' require 'sqreen/ecosystem/module_api/tracing/consumer_data' require 'sqreen/ecosystem/module_api/tracing/producer_data' module Sqreen module Ecosystem module Messaging class Kafka include ModuleApi::Loggable include ModuleApi::Instrumentation include ModuleApi::MessageProducer def setup advice_send = wrap_for_interest(ModuleApi::Tracing::ProducerData, &method(:after_send)) advice_receive = wrap_for_interest(ModuleApi::Tracing::ConsumerData, &method(:after_receive)) instrument 'Kafka::Broker#produce', after: advice_send instrument 'Kafka::Broker#fetch_messages', after: advice_receive end private # @param [Sqreen::Graft::CallbackCall] call def after_send(call, _ball) return if call.raised if call.args.length != 1 logger.info "Expected 1 arguments to Kafka::Broker#produce" return end options = call.args.first topics = options[:messages_for_topics].keys create_signal_data(ModuleApi::Tracing::ProducerData, call.instance, topics) end # @param [Sqreen::Graft::CallbackCall] call def after_receive(call, _ball) return if call.raised if call.args.length != 1 logger.info "Expected 1 arguments to Kafka::Broker#fetch_messages" return end options = call.args.first topics = options[:topics].keys create_signal_data(ModuleApi::Tracing::ConsumerData, call.instance, topics) end # @param [Class] clazz # @param [Kafka::Broker] broker # @param [Array] topics def create_signal_data(clazz, broker, topics) host = broker.instance_variable_get :@host topics.map do |top| clazz.new( message_type: :kafka, host: host, topic: top, ) end end end end end end