[![Gem Version](https://badge.fury.io/rb/logstash-filter-kafka_time_machine.svg)](https://badge.fury.io/rb/logstash-filter-kafka_time_machine) [![Build Status](https://drone.lma.wbx2.com/api/badges/post-deployment/logstash-filter-kafka_time_machine/status.svg)](https://drone.lma.wbx2.com/post-deployment/logstash-filter-kafka_time_machine) # Logstash Plugin: logstash-filter-kafka_time_machine This is a filter plugin for [Logstash](https://github.com/elastic/logstash) ## Description This filter plugin will generate new events in the logstash pipeline. These events are generated based on fields in the original that are extracted and passed to the filter. The new events generate metrics for log events that have traversed multiple kafka and logstash blocks for aggregation. The typical flow for log events: ``` Service Log ---> kafka_shipper <--- logstash_shipper ---> | ott_network_link | ---> kafka_indexer <--- logstash_indexer ---> elastic_search ``` The filter leverages metadata inserted into the log event on both `logstash_shipper` and `logstash_indexer` nodes to track dwell time of log events through this pipeline. ## Kafka Time Machine Result When the `kafka_time_machine` executes it will return a [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) formatted metric, i.e.: ``` ktm,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000 ``` The plugin will also emit a metric if an error was encountered, i.e.: ``` ktm_error,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,owner=ktm_test@cisco.com,source=shipper count=1i 1634662795000000000 ``` To ensure a logstash `output{}` block can properly route this metric, the new event are tagged with a `[@metadata][ktm_tag][ktm_metric]` field, i.e.: ``` { "ktm_metric" => "ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000", "@timestamp" => 2021-10-20T23:46:24.704Z, "@metadata" => { "ktm_tags" => { "ktm_metric" => "true" } }, "@version" => "1" } ``` ### Metric Event Breakdown The `kafka_time_machine` can insert one or more new events in the pipeline. The `ktm_metric` created will be one of: - `ktm` - `ktm_error` In the case of `ktm` the metric breakdown is: | Line Protocol Element | Line Protocol Type | Description | | --------------------- | ------------------ | ------------------------------------------- | | datacenter | tag | Echo of `kafka_datacenter_shipper` | | es_cluster | tag | Echo of `elasticsearch_cluster` | | es_cluster_index | tag | Echo of `elasticsearch_cluster_index` | | lag_type | tag | Calculated lag type | | owner | tag | Echo of `event_owner` | | lag_ms | field | Calculated lag in milliseconds | | payload_size_bytes | field | Calculated size of `payload` field in bytes | Meaning of `lag_type`: - `total`: Lag calculated includes dwell time on both on shipper and indexer - `indexer`: Lag calculated is dwell time for indexer only. Insufficient data provided for shipper to compute `total` lag. - `shipper`: Lag calculated is dwell time for shipper only. Insufficient data provided for indexer to compute `total` lag. In the case of `ktm_error` the metric breakdown is: | Line Protocol Element | Line Protocol Type | Description | | --------------------- | ------------------ | ------------------------------------- | | datacenter | tag | Echo of `kafka_datacenter_shipper` | | es_cluster | tag | Echo of `elasticsearch_cluster` | | es_cluster_index | tag | Echo of `elasticsearch_cluster_index` | | source | tag | Source of the error metric | | owner | tag | Echo of `event_owner` | | count | field | Count to track error; not cumulative | Meaning of `source`: - `indexer`: Insufficient data provided for indexer to compute `total` lag. - `shipper`: Insufficient data provided for shipper to compute `total` lag. - `insufficient_data`: Insufficient data provided both indexer and shipper to compute `total` lag. - `unknown`: Unknown error encountered ### Metric Event Timestamp When the `kafka_time_machine` generates the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) metric it must also set the timestamp on the event. To ensure the caller of filter has control of this the `event_time_ms` configuration is used to set the metric timestamp. For example if `event_time_ms` is provided as `1634662795000` the resulting metric would be: ``` ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000 ``` ## Kafka Time Machine Configuration Options This plugin requires the following configurations: | Setting | Input Type | Required | | --------------------------------------------------------------------- | ---------- | -------- | | [kafka_datacenter_shipper](#kafka_datacenter_shipper) | string | Yes | | [kafka_topic_shipper](#kafka_topic_shipper) | string | Yes | | [kafka_consumer_group_shipper](#kafka_consumer_group_shipper) | string | Yes | | [kafka_append_time_shipper](#kafka_append_time_shipper) | string | Yes | | [logstash_kafka_read_time_shipper](#logstash_kafka_read_time_shipper) | string | Yes | | [kafka_topic_indexer](#kafka_topic_indexer) | string | Yes | | [kafka_consumer_group_indexer](#kafka_consumer_group_indexer) | string | Yes | | [kafka_append_time_indexer](#kafka_append_time_indexer) | string | Yes | | [logstash_kafka_read_time_indexer](#logstash_kafka_read_time_indexer) | string | Yes | | [event_owner](#event_owner) | string | Yes | | [event_time_ms](#event_time_ms) | string | Yes | | [elasticsearch_cluster](#elasticsearch_cluster) | string | Yes | | [elasticsearch_cluster_index](#elasticsearch_cluster_index) | string | Yes | > Why are all settings required? > >> This was a design decision based on the use case. Tracking a Kafka "lag by time" metric, but not knowing the topic and consumer group would be essentially useless. By leveraging the [Kafka input `decorate_events`](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#_metadata_fields) feature we know we'll always have the required fields. >> >> While they are required, they can be passed as empty strings. The plugin will handle these cases, i.e. the `kafka_consumer_group_shipper` name is empty string, and only return `indexer` results ### kafka_datacenter_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide datacenter that log event originated from; datacenter kafka_shipper is in. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_datacenter_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_datacenter_shipper => "%{[dynamic_field]}" } } ``` ### kafka_topic_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka topic log event was read from on shipper. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_topic_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_topic_shipper => "%{[dynamic_field]}" } } ``` ### kafka_consumer_group_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka consumer group log event was read from on shipper. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_consumer_group_shipper => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_consumer_group_shipper => "%{[dynamic_field]}" } } ``` ### kafka_append_time_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event was added to `kafka_shipper`. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_append_time_shipper => 1624394191000 } } ``` ``` filter { kafka_time_machine { kafka_append_time_shipper => "%{[dynamic_field]}" } } ``` ### logstash_kafka_read_time_shipper - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event read from to `kafka_shipper`. Field values can be static or dynamic: ``` filter { kafka_time_machine { logstash_kafka_read_time_shipper => 1624394191000 } } ``` ``` filter { kafka_time_machine { logstash_kafka_read_time_shipper => "%{[dynamic_field]}" } } ``` ### kafka_topic_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka topic log event was read from on indexer. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_topic_indexer => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_topic_indexer => "%{[dynamic_field]}" } } ``` ### kafka_consumer_group_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide kafka consumer group log event was read from on indexer. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_consumer_group_indexer => "static_field" } } ``` ``` filter { kafka_time_machine { kafka_consumer_group_indexer => "%{[dynamic_field]}" } } ``` ### kafka_append_time_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event was added to `kafka_indexer`. Field values can be static or dynamic: ``` filter { kafka_time_machine { kafka_append_time_indexer => 1624394191000 } } ``` ``` filter { kafka_time_machine { kafka_append_time_indexer => "%{[dynamic_field]}" } } ``` ### logstash_kafka_read_time_indexer - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds log event read from to `kafka_indexer`. Field values can be static or dynamic: ``` filter { kafka_time_machine { logstash_kafka_read_time_indexer => 1624394191000 } } ``` ``` filter { kafka_time_machine { logstash_kafka_read_time_indexer => "%{[dynamic_field]}" } } ``` ### event_owner - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide event owner; represents the owner of the log. Field values can be static or dynamic: ``` filter { kafka_time_machine { event_owner => "static_field" } } ``` ``` filter { kafka_time_machine { event_owner => "%{[dynamic_field]}" } } ``` ### event_time_ms - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide EPOCH time in milliseconds that this event is being processed. This time will be appending the generated InfluxDb Line Protocol metric. Field values can be static or dynamic: ``` filter { kafka_time_machine { event_time_ms => 1624394191000 } } ``` ``` filter { kafka_time_machine { event_time_ms => "%{[dynamic_field]}" } } ``` ### elasticsearch_cluster - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide identifier for ElasticSearch cluster log was sent to; represents the owner of the log. Field values can be static or dynamic: ``` filter { kafka_time_machine { elasticsearch_cluster => "static_field" } } ``` ``` filter { kafka_time_machine { elasticsearch_cluster => "%{[dynamic_field]}" } } ``` ### elasticsearch_cluster_index - Value type is [string](https://www.elastic.co/guide/en/logstash/7.13/configuration-file-structure.html#string) - There is no default value for this setting. Provide identifier for ElasticSearch cluster index log will be indexed in; represents the owner of the log. Field values can be static or dynamic: ``` filter { kafka_time_machine { elasticsearch_cluster_index => "static_field" } } ``` ``` filter { kafka_time_machine { elasticsearch_cluster_index => "%{[dynamic_field]}" } } ```