ActiveSupport::Notifications.subscribe /request.connection.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) if event.payload.key?(:exception) tags = { api: event.payload.fetch(:api, 'unknown'), broker: event.payload.fetch(:broker_host) } Meter.increment('messaging.api.errors', tags: tags) end end ActiveSupport::Notifications.subscribe /process_message.consumer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) tags = { group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) Meter.increment('messaging.consumer.process_message.errors', tags: tags) else Meter.increment('messaging.consumer.messages', tags: tags) end end ActiveSupport::Notifications.subscribe /process_batch.consumer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) message_count = event.payload.fetch(:message_count) tags = { group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) Meter.increment('messaging.consumer.process_batch.errors', tags: tags) else Meter.increment('messaging.consumer.messages', value: message_count, tags: tags) end end ActiveSupport::Notifications.subscribe /produce_message.producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) topic = event.payload.fetch(:topic) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f # This gets us the write rate. Meter.increment('messaging.producer.produce.messages', tags: { topic: topic }) # This gets us the avg/max buffer size per producer. Meter.histogram('messaging.producer.buffer.size', buffer_size) # This gets us the avg/max buffer fill ratio per producer. Meter.histogram('messaging.producer.buffer.fill_ratio', buffer_fill_ratio) end ActiveSupport::Notifications.subscribe /buffer_overflow.producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) tags = { topic: event.payload.fetch(:topic) } Meter.increment('messaging.producer.produce.errors', tags: tags) end ActiveSupport::Notifications.subscribe /deliver_messages.producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) message_count = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) Meter.increment('messaging.producer.deliver.errors') if event.payload.key?(:exception) # Messages delivered to Kafka: Meter.increment('messaging.producer.deliver.messages', value: message_count) # Number of attempts to deliver messages: Meter.histogram('messaging.producer.deliver.attempts', attempts) end ActiveSupport::Notifications.subscribe /topic_error.producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) tags = { topic: event.payload.fetch(:topic) } Meter.increment('messaging.producer.ack.errors', tags: tags) end ActiveSupport::Notifications.subscribe /enqueue_message.async_producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) queue_size = event.payload.fetch(:queue_size) max_queue_size = event.payload.fetch(:max_queue_size) queue_fill_ratio = queue_size.to_f / max_queue_size.to_f # This gets us the avg/max queue size per producer. Meter.histogram('messaging.async_producer.queue.size', queue_size) # This gets us the avg/max queue fill ratio per producer. Meter.histogram('messaging.async_producer.queue.fill_ratio', queue_fill_ratio) end ActiveSupport::Notifications.subscribe /buffer_overflow.async_producer.kafka/ do |*args| event = ActiveSupport::Notifications::Event.new(*args) tags = { topic: event.payload.fetch(:topic) } Meter.increment('messaging.async_producer.produce.errors', tags: tags) end