# encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" require "logstash/event" class LogStash::Filters::KafkaTimeMachine < LogStash::Filters::Base config_name "kafka_time_machine" # Datacenter the kafka message originated from. config :kafka_datacenter_shipper, :validate => :string, :required => true # Kafka Topic on shipper datacenter config :kafka_topic_shipper, :validate => :string, :required => true # Kafka Consumer Group on shipper datacenter config :kafka_consumer_group_shipper, :validate => :string, :required => true # Time message was appended to kafka on shipper datacenter config :kafka_append_time_shipper, :validate => :string, :required => true # Time message read from kafka by logstash on shipper datacenter config :logstash_kafka_read_time_shipper, :validate => :string, :required => true # Kafka Topic on indexer datacenter config :kafka_topic_indexer, :validate => :string, :required => true # Kafka Consumer Group on indexer datacenter config :kafka_consumer_group_indexer, :validate => :string, :required => true # Time message was appended to kafka on indexer datacenter config :kafka_append_time_indexer, :validate => :string, :required => true # Time message read from kafka by logstash on indexer datacenter config :logstash_kafka_read_time_indexer, :validate => :string, :required => true public def register end public def filter(event) # Extract shipper data and check for validity; note that kafka_datacenter_shipper is used for both shipper and indexer arrays kafka_datacenter_shipper = event.sprintf(@kafka_datacenter_shipper) kafka_topic_shipper = event.sprintf(@kafka_topic_shipper) kafka_consumer_group_shipper = event.sprintf(@kafka_consumer_group_shipper) kafka_append_time_shipper = Float(event.sprintf(@kafka_append_time_shipper)) rescue nil logstash_kafka_read_time_shipper = Float(event.sprintf(@logstash_kafka_read_time_shipper)) rescue nil kafka_shipper_array = Array[kafka_datacenter_shipper, kafka_topic_shipper, kafka_consumer_group_shipper, kafka_append_time_shipper, logstash_kafka_read_time_shipper] @logger.debug("kafka_shipper_array: #{kafka_shipper_array}") if (kafka_shipper_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_shipper_array invalid: Found null") error_string_shipper = "Error in shipper data: #{kafka_shipper_array}" shipper_valid = false else @logger.debug("kafka_shipper_array valid") shipper_valid = true logstash_kafka_read_time_shipper = logstash_kafka_read_time_shipper.to_i kafka_append_time_shipper = kafka_append_time_shipper.to_i kafka_shipper_lag_ms = logstash_kafka_read_time_shipper - kafka_append_time_shipper end # Extract indexer data and check for validity kafka_topic_indexer = event.sprintf(@kafka_topic_indexer) kafka_consumer_group_indexer = event.sprintf(@kafka_consumer_group_indexer) kafka_append_time_indexer = Float(event.sprintf(@kafka_append_time_indexer)) rescue nil logstash_kafka_read_time_indexer = Float(event.sprintf(@logstash_kafka_read_time_indexer)) rescue nil kafka_indexer_array = Array[kafka_datacenter_shipper, kafka_topic_indexer, kafka_consumer_group_indexer, kafka_append_time_indexer, logstash_kafka_read_time_indexer] @logger.debug("kafka_indexer_array: #{kafka_indexer_array}") if (kafka_indexer_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("kafka_indexer_array invalid: Found null") error_string_indexer = "Error in indexer data: #{kafka_indexer_array}" indexer_valid = false else @logger.debug("kafka_indexer_array valid") indexer_valid = true logstash_kafka_read_time_indexer = logstash_kafka_read_time_indexer.to_i kafka_append_time_indexer = kafka_append_time_indexer.to_i kafka_indexer_lag_ms = logstash_kafka_read_time_indexer - kafka_append_time_indexer end if (shipper_valid == true && indexer_valid == true) kafka_total_lag_ms = logstash_kafka_read_time_indexer - kafka_append_time_shipper event.set("[ktm]", {"lag_total_ms" => kafka_total_lag_ms, "lag_indexer_ms" => kafka_indexer_lag_ms, "lag_shipper_ms" => kafka_shipper_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_indexer" => kafka_topic_indexer, "kafka_consumer_group_indexer" => kafka_consumer_group_indexer, "kafka_topic_shipper" => kafka_topic_shipper, "kafka_consumer_group_shipper" => kafka_consumer_group_shipper, "tags" => ["ktm_lag_total"] }) elsif (shipper_valid == true && indexer_valid == false) event.set("[ktm]", {"lag_shipper_ms" => kafka_shipper_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_shipper" => kafka_topic_shipper, "kafka_consumer_group_shipper" => kafka_consumer_group_shipper, "tags" => ["ktm_lag_shipper"] }) elsif (indexer_valid == true && shipper_valid == false) event.set("[ktm]", {"lag_indexer_ms" => kafka_indexer_lag_ms, "datacenter_shipper" => kafka_datacenter_shipper, "kafka_topic_indexer" => kafka_topic_indexer, "kafka_consumer_group_indexer" => kafka_consumer_group_indexer, "tags" => ["ktm_lag_indexer"] }) elsif (indexer_valid == false && shipper_valid == false) @logger.debug("Error kafka_time_machine: Could not build valid response --> #{error_string_shipper}, #{error_string_indexer}") end # Add in the size of the payload field if event.get("[payload]") payload_bytesize = event.get("[payload]").bytesize event.set("[ktm][payload_size_bytes]", payload_bytesize) end # filter_matched should go in the last line of our successful code filter_matched(event) end # def filter end # class LogStash::Filters::KafkaTimeMachine