lib/franz/output/kafka.rb in franz-2.0.0 vs lib/franz/output/kafka.rb in franz-2.0.1
- old
+ new
@@ -26,11 +26,10 @@
input: [],
output: {
flush_interval: 10,
flush_size: 500,
client_id: @@host,
- cluster: %w[ localhost:9092 ],
type: 'sync',
compression_codec: 'snappy',
metadata_refresh_interval_ms: 600000,
max_send_retries: 3,
retry_backoff_ms: 100,
@@ -49,16 +48,18 @@
@foreground = opts[:foreground]
@flush_size = opts[:output].delete :flush_size
@flush_interval = opts[:output].delete :flush_interval
- kafka_cluster = opts[:output].delete :cluster
+ kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ]
kafka_client_id = opts[:output].delete :client_id
- kafka_config = opts[:output].map { |k,v| v.is_a?(String) ? v.to_sym : v }
+ kafka_config = opts[:output].map { |k,v|
+ [ k, v.is_a?(String) ? v.to_sym : v ]
+ }
@kafka = Poseidon::Producer.new \
- kafka_cluster,
+ kafka_brokers,
kafka_client_id,
Hash[kafka_config]
@lock = Mutex.new
@messages = []
@@ -91,11 +92,11 @@
payload = JSON::generate(event)
@messages << Poseidon::MessageToSend.new(event[:type].to_s, payload)
@statz.inc :num_output
- if @statz.get(:num_output) % @flush_size == 0
- @kafka.send_messages @messages unless @messages.empty?
+ if @messages.size >= @flush_size
+ @kafka.send_messages @messages
log.debug \
event: 'flush',
num_messages: @messages.size
@messages = []
end
\ No newline at end of file