# encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" require "logstash/event" class LogStash::Filters::KafkaTimeMachine < LogStash::Filters::Base config_name "kafkatimemachine" public def register end public def filter(event) # Extract shipper data and check for validity; note that kafka_datacenter_shipper is used for both shipper and indexer arrays kafka_datacenter_shipper = event.get("[@metadata][kafka_datacenter_shipper]") kafka_topic_shipper = event.get("[@metadata][kafka_topic_shipper]") kafka_consumer_group_shipper = event.get("[@metadata][kafka_consumer_group_shipper]") kafka_append_time_shipper = Float(event.get("[@metadata][kafka_append_time_shipper]")) rescue nil logstash_kafka_read_time_shipper = Float(event.get("[@metadata][logstash_kafka_read_time_shipper]")) rescue nil kafka_shipper_array = Array[kafka_datacenter_shipper, kafka_topic_shipper, kafka_consumer_group_shipper, kafka_append_time_shipper, logstash_kafka_read_time_shipper] @logger.debug("kafka_shipper_array: #{kafka_shipper_array}") if (kafka_shipper_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_shipper_array invalid: Found null") error_string_shipper = "Error in shipper data: #{kafka_shipper_array}" shipper_valid = false else @logger.debug("kafka_shipper_array valid") shipper_valid = true logstash_kafka_read_time_shipper = logstash_kafka_read_time_shipper.to_i kafka_append_time_shipper = kafka_append_time_shipper.to_i kafka_shipper_lag_ms = logstash_kafka_read_time_shipper - kafka_append_time_shipper end # Extract indexer data and check for validity kafka_topic_indexer = event.get("[@metadata][kafka_topic_indexer]") kafka_consumer_group_indexer = event.get("[@metadata][kafka_consumer_group_indexer]") kafka_append_time_indexer = Float(event.get("[@metadata][kafka_append_time_indexer]")) rescue nil logstash_kafka_read_time_indexer = Float(event.get("[@metadata][logstash_kafka_read_time_indexer]")) rescue nil kafka_indexer_array = Array[kafka_datacenter_shipper, kafka_topic_indexer, kafka_consumer_group_indexer, kafka_append_time_indexer, logstash_kafka_read_time_indexer] @logger.debug("kafka_indexer_array: #{kafka_indexer_array}") if (kafka_indexer_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_indexer_array invalid: Found null") error_string_indexer = "Error in indexer data: #{kafka_indexer_array}" indexer_valid = false else @logger.debug("kafka_indexer_array valid") indexer_valid = true logstash_kafka_read_time_indexer = logstash_kafka_read_time_indexer.to_i kafka_append_time_indexer = kafka_append_time_indexer.to_i kafka_indexer_lag_ms = logstash_kafka_read_time_indexer - kafka_append_time_indexer end if (shipper_valid == true && indexer_valid == true) kafka_total_lag_ms = logstash_kafka_read_time_indexer - kafka_append_time_shipper event.set("[_ktm]", {"lag_total" => kafka_total_lag_ms, "lag_indexer" => kafka_indexer_lag_ms, "lag_shipper" => kafka_shipper_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_indexer" => kafka_topic_indexer, "kafka_consumer_group_indexer" => kafka_consumer_group_indexer, "kafka_topic_shipper" => kafka_topic_shipper, "kafka_consumer_group_shipper" => kafka_consumer_group_shipper, "tags" => ["ktm_lag_complete"] }) elsif (shipper_valid == true && indexer_valid == false) event.set("[_ktm]", {"lag_shipper" => kafka_shipper_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_shipper" => kafka_topic_shipper, "kafka_consumer_group_shipper" => kafka_consumer_group_shipper, "tags" => ["ktm_lag_shipper"] }) elsif (indexer_valid == true && shipper_valid == false) event.set("[_ktm]", {"lag_indexer" => kafka_indexer_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_indexer" => kafka_topic_indexer, "kafka_consumer_group_indexer" => kafka_consumer_group_indexer, "tags" => ["ktm_lag_indexer"] }) elsif (indexer_valid == false && shipper_valid == false) @logger.error("Error kafkatimemachine: Could not build valid response --> #{error_string_shipper}, #{error_string_indexer}") # event.set("[_ktm]", {"error_shipper" => error_string_shipper, "error_indexer" => error_string_indexer, "datacenter_shipper" => kafka_datacenter_shipper, "tags" => ["ktm_error"] }) end # filter_matched should go in the last line of our successful code filter_matched(event) end # def filter end # class LogStash::Filters::KafkaTimeMachine