# encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" require "logstash/event" require "json" class LogStash::Filters::KafkaTimeMachine < LogStash::Filters::Base config_name "kafka_time_machine" # Datacenter the kafka message originated from. config :kafka_datacenter_shipper, :validate => :string, :required => true # Kafka Topic on shipper datacenter config :kafka_topic_shipper, :validate => :string, :required => true # Kafka Consumer Group on shipper datacenter config :kafka_consumer_group_shipper, :validate => :string, :required => true # Time message was appended to kafka on shipper datacenter config :kafka_append_time_shipper, :validate => :string, :required => true # Time message read from kafka by logstash on shipper datacenter config :logstash_kafka_read_time_shipper, :validate => :string, :required => true # Kafka Topic on indexer datacenter config :kafka_topic_indexer, :validate => :string, :required => true # Kafka Consumer Group on indexer datacenter config :kafka_consumer_group_indexer, :validate => :string, :required => true # Time message was appended to kafka on indexer datacenter config :kafka_append_time_indexer, :validate => :string, :required => true # Time message read from kafka by logstash on indexer datacenter config :logstash_kafka_read_time_indexer, :validate => :string, :required => true # Owner of the event currenty being process. config :event_owner, :validate => :string, :required => true # Current time since EPOCH in ms that should be set in the generated metric config :event_time_ms, :validate => :string, :required => true # Current time since EPOCH in ms that should be set in the generated metric config :elasticsearch_cluster, :validate => :string, :required => true # Current time since EPOCH in ms that should be set in the generated metric config :elasticsearch_cluster_index, :validate => :string, :required => true public def register end public def filter(event) @logger.debug("Starting filter calculations") # Note - It was considered to error check for strings that are invalid, i.e. "%{[@metadata][ktm][kafka_datacenter_shipper]}". However, this string being present is a good way to identify # shipper/indexer logstash configs that are wrong so its allowed to pass through unaltered. # # Extract all string values to local variables. event_owner = event.sprintf(@event_owner) shipper_kafka_datacenter = event.sprintf(@kafka_datacenter_shipper) shipper_kafka_topic = event.sprintf(@kafka_topic_shipper) shipper_kafka_consumer_group = event.sprintf(@kafka_consumer_group_shipper) indexer_kafka_topic = event.sprintf(@kafka_topic_indexer) indexer_kafka_consumer_group = event.sprintf(@kafka_consumer_group_indexer) elasticsearch_cluster = event.sprintf(@elasticsearch_cluster) elasticsearch_cluster_index = event.sprintf(@elasticsearch_cluster_index) # Extract all the "time" related values to local variables. This need special handling due to the Float() operation. # # We must check for a valid numberic value; if not the float operation will error out on "invalid hash error" and stop logstash pipeline event_time_ms = get_numeric(event.sprintf(@event_time_ms)) shipper_kafka_append_time = get_numeric(event.sprintf(@kafka_append_time_shipper)) shipper_logstash_kafka_read_time = get_numeric(event.sprintf(@logstash_kafka_read_time_shipper)) indexer_kafka_append_time = get_numeric(event.sprintf(@kafka_append_time_indexer)) indexer_logstash_kafka_read_time = get_numeric(event.sprintf(@logstash_kafka_read_time_indexer)) # Validate the shipper data shipper_kafka_array = Array[shipper_kafka_datacenter, shipper_kafka_topic, shipper_kafka_consumer_group, shipper_kafka_append_time, shipper_logstash_kafka_read_time, event_owner, event_time_ms, elasticsearch_cluster, elasticsearch_cluster_index] if (shipper_kafka_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("shipper_kafka_array invalid: Found null") error_string_shipper = sprintf("Error in shipper data: %s", shipper_kafka_array) @logger.debug(error_string_shipper) shipper_valid = false else @logger.debug("shipper_kafka_array valid") shipper_valid = true shipper_logstash_kafka_read_time = shipper_logstash_kafka_read_time.to_i shipper_kafka_append_time = shipper_kafka_append_time.to_i shipper_kafka_lag_ms = shipper_logstash_kafka_read_time - shipper_kafka_append_time end # Validate the indexer data indexer_kafka_array = Array[shipper_kafka_datacenter, indexer_kafka_topic, indexer_kafka_consumer_group, indexer_kafka_append_time, indexer_logstash_kafka_read_time, event_owner, event_time_ms, elasticsearch_cluster, elasticsearch_cluster_index] if (indexer_kafka_array.any? { |text| text.nil? || text.to_s.empty? }) @logger.debug("indexer_kafka_array invalid: Found null") error_string_indexer = sprintf("Error in indexer data: %s", indexer_kafka_array) @logger.debug(error_string_indexer) indexer_valid = false else @logger.debug("indexer_kafka_array valid") indexer_valid = true indexer_logstash_kafka_read_time = indexer_logstash_kafka_read_time.to_i indexer_kafka_append_time = indexer_kafka_append_time.to_i indexer_kafka_lag_ms = indexer_logstash_kafka_read_time - indexer_kafka_append_time end # Add in the size of the payload field payload_size_bytes = 0 if event.get("[payload]") payload_size_bytes = event.get("[payload]").bytesize end # Set time (nanoseconds) for event that is generated epoch_time_ns = nil if (event_time_ms != nil ) epoch_time_ns = event_time_ms * 1000000 end # Create array to hold one or more ktm metric events ktm_metric_event_array = Array.new # Populate the event and set tags if (shipper_valid == true && indexer_valid == true && epoch_time_ns != nil) total_kafka_lag_ms = indexer_logstash_kafka_read_time - shipper_kafka_append_time point_ktm = create_point_ktm(shipper_kafka_datacenter, event_owner, payload_size_bytes, "total", total_kafka_lag_ms, epoch_time_ns, elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm elsif (shipper_valid == true && indexer_valid == false && epoch_time_ns != nil) point_ktm = create_point_ktm(shipper_kafka_datacenter, event_owner, payload_size_bytes, "shipper", shipper_kafka_lag_ms, epoch_time_ns, elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm point_ktm = create_point_ktm_error(shipper_kafka_datacenter, event_owner, epoch_time_ns, "indexer", elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm elsif (indexer_valid == true && shipper_valid == false && epoch_time_ns != nil) point_ktm = create_point_ktm(shipper_kafka_datacenter, event_owner, payload_size_bytes, "indexer", indexer_kafka_lag_ms, epoch_time_ns, elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm point_ktm = create_point_ktm_error(shipper_kafka_datacenter, event_owner, epoch_time_ns, "shipper", elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm elsif (indexer_valid == false && shipper_valid == false) point_ktm = create_point_ktm_error(shipper_kafka_datacenter, event_owner, epoch_time_ns, "insufficient_data", elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm error_string = sprintf("Error kafka_time_machine: Could not build valid response --> %s, %s", error_string_shipper, error_string_indexer) @logger.debug(error_string) else point_ktm = create_point_ktm_error(shipper_kafka_datacenter, event_owner, epoch_time_ns, "unknown", elasticsearch_cluster, elasticsearch_cluster_index) ktm_metric_event_array.push point_ktm error_string = "Unknown error encountered" @logger.debug(error_string) end # Publish even event in our array ktm_metric_event_array.each do |metric_event| # Create new event for KTM metric event_ktm = LogStash::Event.new(metric_event) event_ktm.set("[@metadata][ktm_tags][ktm_metric]", "true") filter_matched(event_ktm) yield event_ktm end end # def filter # Creates hash with ktm data point to return public def create_point_ktm(datacenter, event_owner, payload_size_bytes, lag_type, lag_ms, epoch_time_ns, elasticsearch_cluster, elasticsearch_cluster_index) point = Hash.new # Name of point and time created point["name"] = "ktm" point["epoch_time_ns"] = epoch_time_ns # tags point["datacenter"] = datacenter point["owner"] = event_owner point["lag_type"] = lag_type point["es_cluster"] = elasticsearch_cluster point["es_cluster_index"] = elasticsearch_cluster_index # fields point["payload_size_bytes"] = payload_size_bytes point["lag_ms"] = lag_ms return point end # def create_point_ktm # Creates hash with ktm data point to return public def create_point_ktm_error(datacenter, event_owner, epoch_time_ns, type, elasticsearch_cluster, elasticsearch_cluster_index) # Check for nil values if (nil == datacenter) datacenter = "unknown" end if (nil == event_owner) event_owner = "unknown" end # set time if we didn't recieve it if (nil == epoch_time_ns) epoch_time_ns = ((Time.now.to_f * 1000).to_i)*1000000 end point = Hash.new # Name of point and time created point["name"] = "ktm_error" point["epoch_time_ns"] = epoch_time_ns # tags point["datacenter"] = datacenter point["owner"] = event_owner point["source"] = type point["es_cluster"] = elasticsearch_cluster point["es_cluster_index"] = elasticsearch_cluster_index # fields point["count"] = 1 return point end # def create_point_ktm_error # Ensures the provided value is numeric; if not returns 'nil' public def get_numeric(input_str) # @logger.debug("Aggregate timeout for '#{@task_id}' pattern: #{@timeout} seconds") @logger.debug("get_numeric operating on: #{input_str} ") is_numeric = input_str.to_s.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true if (true == is_numeric) @logger.debug("get_numeric - valid value provided") num_value = Float(sprintf(input_str)) if (false == num_value.positive?) @logger.debug("get_numeric - negative value provided") num_value = nil end else @logger.debug("get_numeric - invalid value provided") num_value = nil end @logger.debug(sprintf("get_numeric response --> #{num_value}")) return num_value end # def get_numberic end # class LogStash::Filters::KafkaTimeMachine