# fluent-plugin-kafka, a plugin for [Fluentd](http://fluentd.org) [![Build Status](https://travis-ci.org/htgc/fluent-plugin-kafka.svg?branch=master)](https://travis-ci.org/htgc/fluent-plugin-kafka) A fluentd plugin to both consume and produce data for Apache Kafka. TODO: Also, I need to write tests ## Installation Add this line to your application's Gemfile: gem 'fluent-plugin-kafka' And then execute: $ bundle Or install it yourself as: $ gem install fluent-plugin-kafka ## Usage ### Input plugin (@type 'kafka') @type kafka host port topics format message_key add_prefix add_suffix max_bytes (integer) :default => nil (Use default of Poseidon) max_wait_ms (integer) :default => nil (Use default of Poseidon) min_bytes (integer) :default => nil (Use default of Poseidon) socket_timeout_ms (integer) :default => nil (Use default of Poseidon) Supports following Poseidon::PartitionConsumer options. - max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch - max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data. - min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us. - socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms. Supports a start of processing from the assigned offset for specific topics. @type kafka host port format topic partition offset topic partition offset See also [Poseidon::PartitionConsumer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/PartitionConsumer) for more detailed documentation about Poseidon. ### Input plugin (@type 'kafka_group', supports kafka group) @type kafka_group brokers zookeepers consumer_group topics format message_key add_prefix add_suffix max_bytes (integer) :default => nil (Use default of Poseidon) max_wait_ms (integer) :default => nil (Use default of Poseidon) min_bytes (integer) :default => nil (Use default of Poseidon) socket_timeout_ms (integer) :default => nil (Use default of Poseidon) Supports following Poseidon::PartitionConsumer options. - max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch - max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data. - min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us. - socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms. See also [Poseidon::PartitionConsumer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/PartitionConsumer) for more detailed documentation about Poseidon. ### Output plugin (non-buffered) This plugin uses Poseidon producer for writing data. For performance and reliability concerns, use `kafka_bufferd` output instead. @type kafka # Brokers: you can choose either brokers or zookeeper. brokers :,:,.. # Set brokers directly zookeeper : # Set brokers via Zookeeper zookeeper_path :default => /brokers/ids # Set path in zookeeper for kafka default_topic default_partition_key (string) :default => nil output_data_type (json|ltsv|msgpack|attr:|) output_include_tag (true|false) :default => false output_include_time (true|false) :default => false max_send_retries (integer) :default => 3 required_acks (integer) :default => 0 ack_timeout_ms (integer) :default => 1500 compression_codec (none|gzip|snappy) :default => none Supports following Poseidon::Producer options. - max_send_retries — default: 3 — Number of times to retry sending of messages to a leader. - required_acks — default: 0 — The number of acks required per request. - ack_timeout_ms — default: 1500 — How long the producer waits for acks. - compression_codec - default: none - The codec the producer uses to compress messages. See also [Poseidon::Producer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/Producer) for more detailed documentation about Poseidon. This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression. $ gem install snappy #### Load balancing Messages will be sent broker in a round-robin manner as default by Poseidon, but you can set `default_partition_key` in config file to route messages to a specific broker. If key name `partition_key` exists in a message, this plugin set its value of partition_key as key. |default_partition_key|partition_key| behavior | | --- | --- | --- | |Not set|Not exists| All messages are sent in round-robin | |Set| Not exists| All messages are sent to specific broker | |Not set| Exists | Messages which have partition_key record are sent to specific broker, others are sent in round-robin| |Set| Exists | Messages which have partition_key record are sent to specific broker with parition_key, others are sent to specific broker with default_parition_key| ### Buffered output plugin This plugin uses ruby-kafka producer for writing data. This plugin works with recent kafka versions. @type kafka_buffered # Brokers: you can choose either brokers or zookeeper. brokers :,:,.. # Set brokers directly zookeeper : # Set brokers via Zookeeper zookeeper_path :default => /brokers/ids # Set path in zookeeper for kafka default_topic default_partition_key (string) :default => nil flush_interval 60> buffer_type (file|memory) output_data_type (json|ltsv|msgpack|attr:|) output_include_tag (true|false) :default => false output_include_time (true|false) :default => false max_send_retries (integer) :default => 1 required_acks (integer) :default => 0 ack_timeout (integer) :default => 5 compression_codec (gzip|snappy) :default => none Supports following ruby-kafka's producer options. - max_send_retries — default: 1 — Number of times to retry sending of messages to a leader. - required_acks — default: 0 — The number of acks required per request. - ack_timeout — default: 5 — How long the producer waits for acks. The unit is seconds. - compression_codec - default: nil - The codec the producer uses to compress messages. See also [Kafka::Client](http://www.rubydoc.info/gems/ruby-kafka/Kafka/Client) for more detailed documentation about Poseidon. This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression. $ gem install snappy ## Contributing 1. Fork it 2. Create your feature branch (`git checkout -b my-new-feature`) 3. Commit your changes (`git commit -am 'Added some feature'`) 4. Push to the branch (`git push origin my-new-feature`) 5. Create new Pull Request