# fluent-plugin-kafka, a plugin for [Fluentd](http://fluentd.org)
[![Build Status](https://travis-ci.org/htgc/fluent-plugin-kafka.svg?branch=master)](https://travis-ci.org/htgc/fluent-plugin-kafka)
A fluentd plugin to both consume and produce data for Apache Kafka.
TODO: Also, I need to write tests
## Installation
Add this line to your application's Gemfile:
gem 'fluent-plugin-kafka'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fluent-plugin-kafka
## Usage
### Input plugin (@type 'kafka')
Supports following Poseidon::PartitionConsumer options.
- max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch
- max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data.
- min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us.
- socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms.
Supports a start of processing from the assigned offset for specific topics.
See also [Poseidon::PartitionConsumer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/PartitionConsumer) for more detailed documentation about Poseidon.
### Input plugin (@type 'kafka_group', supports kafka group)
Supports following Poseidon::PartitionConsumer options.
- max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch
- max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data.
- min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us.
- socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms.
See also [Poseidon::PartitionConsumer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/PartitionConsumer) for more detailed documentation about Poseidon.
### Output plugin (non-buffered)
This plugin uses Poseidon producer for writing data. For performance and reliability concerns, use `kafka_bufferd` output instead.
@type kafka
# Brokers: you can choose either brokers or zookeeper.
brokers :,:,.. # Set brokers directly
zookeeper : # Set brokers via Zookeeper
zookeeper_path :default => /brokers/ids # Set path in zookeeper for kafka
default_topic
Supports following Poseidon::Producer options.
- max_send_retries — default: 3 — Number of times to retry sending of messages to a leader.
- required_acks — default: 0 — The number of acks required per request.
- ack_timeout_ms — default: 1500 — How long the producer waits for acks.
- compression_codec - default: none - The codec the producer uses to compress messages.
See also [Poseidon::Producer](http://www.rubydoc.info/github/bpot/poseidon/Poseidon/Producer) for more detailed documentation about Poseidon.
This plugin supports compression codec "snappy" also.
Install snappy module before you use snappy compression.
$ gem install snappy
#### Load balancing
Messages will be sent broker in a round-robin manner as default by Poseidon, but you can set `default_partition_key` in config file to route messages to a specific broker.
If key name `partition_key` exists in a message, this plugin set its value of partition_key as key.
|default_partition_key|partition_key| behavior |
| --- | --- | --- |
|Not set|Not exists| All messages are sent in round-robin |
|Set| Not exists| All messages are sent to specific broker |
|Not set| Exists | Messages which have partition_key record are sent to specific broker, others are sent in round-robin|
|Set| Exists | Messages which have partition_key record are sent to specific broker with parition_key, others are sent to specific broker with default_parition_key|
### Buffered output plugin
This plugin uses ruby-kafka producer for writing data. This plugin works with recent kafka versions.
@type kafka_buffered
# Brokers: you can choose either brokers or zookeeper.
brokers :,:,.. # Set brokers directly
zookeeper : # Set brokers via Zookeeper
zookeeper_path :default => /brokers/ids # Set path in zookeeper for kafka
default_topic
Supports following ruby-kafka's producer options.
- max_send_retries — default: 1 — Number of times to retry sending of messages to a leader.
- required_acks — default: 0 — The number of acks required per request.
- ack_timeout — default: 5 — How long the producer waits for acks. The unit is seconds.
- compression_codec - default: nil - The codec the producer uses to compress messages.
See also [Kafka::Client](http://www.rubydoc.info/gems/ruby-kafka/Kafka/Client) for more detailed documentation about Poseidon.
This plugin supports compression codec "snappy" also.
Install snappy module before you use snappy compression.
$ gem install snappy
## Contributing
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Added some feature'`)
4. Push to the branch (`git push origin my-new-feature`)
5. Create new Pull Request