[![Gem Version](https://badge.fury.io/rb/logstash-filter-kafka_time_machine.svg)](https://badge.fury.io/rb/logstash-filter-kafka_time_machine) # Logstash Plugin: logstash-filter-kafka_time_machine This is a filter plugin for [Logstash](https://github.com/elastic/logstash) ## Description This filter plugin add additional fields to an existing event. These fields provide additional metadata for tracking log events that have traversed multiple kafka and logstash blocks for aggregation. The typical flow for log events: ``` Service Log ---> kafka_shipper <--- logstash_shipper ---> | ott_network_link | ---> kafka_indexer <--- logstash_indexer ---> elastic_search ``` This filter leverages metadata inserted into the log event on both `logstash_shipper` and `logstash_indexer` nodes to track dwell time of log events through this pipeline. When used the filter will add the following fields to the log event: | Event Field | Output Type | Description | | ----------------------------------- | ----------- | ----------------------------------------------------------------------------------------- | | [ktm][datacenter_shipper] | string | Echo of `kafka_datacenter_shipper | | [ktm][kafka_topic_shipper] | string | Echo of `kafka_topic_shipper | | [ktm][kafka_consumer_group_shipper] | string | Echo of `kafka_consumer_group_shipper | | [ktm][kafka_topic_indexer] | string | Echo of `kafka_topic_indexer | | [ktm][kafka_consumer_group_indexer] | string | Echo of `kafka_consumer_group_indexer | | [ktm][payload_size_bytes] | number | If present in event, `payload` field size in bytes, else it's omitted | | [ktm][lag_shipper_ms] | number | Value of "`logstash_kafka_read_time_shipper - kafka_append_time_shipper`" in milliseconds | | [ktm][lag_indexer_ms] | number | Value of "`logstash_kafka_read_time_indexer - kafka_append_time_indexer`" in milliseconds | | [ktm][lag_total_ms] | number | Value of "`logstash_kafka_read_time_indexer - kafka_append_time_shipper`" in milliseconds | What you do with the `[ktm]` event on it's return is use case specific. One currently used approach is to index this data in the ES document for log event it was generated against. This provides metrics on a per document level in ES. ## Kafka Time Machine Configuration Options This plugin supports the following configuration options: | Setting | Input Type | Required | | --------------------------------------------------------------------- | ---------- | -------- | | [kafka_datacenter_shipper](#kafka_datacenter_shipper) | string | Yes | | [kafka_topic_shipper](#kafka_topic_shipper) | string | Yes | | [kafka_consumer_group_shipper](#kafka_consumer_group_shipper) | string | Yes | | [kafka_append_time_shipper](#kafka_append_time_shipper) | string | Yes | | [logstash_kafka_read_time_shipper](#logstash_kafka_read_time_shipper) | string | Yes | | [kafka_topic_indexer](#kafka_topic_indexer) | string | Yes | | [kafka_consumer_group_indexer](#kafka_consumer_group_indexer) | string | Yes | | [kafka_append_time_indexer](#kafka_append_time_indexer) | string | Yes | | [logstash_kafka_read_time_indexer](#logstash_kafka_read_time_indexer) | string | Yes | ### kafka_datacenter_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide datacenter that log event originated from; datacenter kafka_shipper is in. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_datacenter_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_datacenter_shipper => "%{[dynamic_field]}" } } ``` ### kafka_topic_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka topic log event was read from on shipper. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_topic_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_topic_shipper => "%{[dynamic_field]}" } } ``` ### kafka_consumer_group_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka consumer group log event was read from on shipper. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_consumer_group_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_consumer_group_shipper => "%{[dynamic_field]}" } } ``` ### kafka_append_time_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event was added to `kafka_shipper`. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_append_time_shipper => 1624394191000 } } ``` ``` filter { kafka_time_machine { kafka_append_time_shipper => "%{[dynamic_field]}" } } ``` ### logstash_kafka_read_time_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event read from to `kafka_shipper`. Field values can be static or dynamic: ``` filter { kafka_time_machine { logstash_kafka_read_time_shipper => 1624394191000 } } ``` ``` filter { kafka_time_machine { logstash_kafka_read_time_shipper => "%{[dynamic_field]}" } } ``` ### kafka_topic_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka topic log event was read from on indexer. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_topic_indexer => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_topic_indexer => "%{[dynamic_field]}" } } ``` ### kafka_consumer_group_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka consumer group log event was read from on indexer. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_consumer_group_indexer => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_consumer_group_indexer => "%{[dynamic_field]}" } } ``` ### kafka_append_time_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event was added to `kafka_indexer`. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_append_time_indexer => 1624394191000 } } ``` ``` filter { kafka_time_machine { kafka_append_time_indexer => "%{[dynamic_field]}" } } ``` ### logstash_kafka_read_time_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event read from to `kafka_indexer`. Field values can be static or dynamic: ``` filter { kafka_time_machine { logstash_kafka_read_time_indexer => 1624394191000 } } ``` ``` filter { kafka_time_machine { logstash_kafka_read_time_indexer => "%{[dynamic_field]}" } } ```