lib/franz/output/kafka.rb in franz-2.0.0 vs lib/franz/output/kafka.rb in franz-2.0.1

- old
+ new

@@ -26,11 +26,10 @@ input: [], output: { flush_interval: 10, flush_size: 500, client_id: @@host, - cluster: %w[ localhost:9092 ], type: 'sync', compression_codec: 'snappy', metadata_refresh_interval_ms: 600000, max_send_retries: 3, retry_backoff_ms: 100, @@ -49,16 +48,18 @@ @foreground = opts[:foreground] @flush_size = opts[:output].delete :flush_size @flush_interval = opts[:output].delete :flush_interval - kafka_cluster = opts[:output].delete :cluster + kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] kafka_client_id = opts[:output].delete :client_id - kafka_config = opts[:output].map { |k,v| v.is_a?(String) ? v.to_sym : v } + kafka_config = opts[:output].map { |k,v| + [ k, v.is_a?(String) ? v.to_sym : v ] + } @kafka = Poseidon::Producer.new \ - kafka_cluster, + kafka_brokers, kafka_client_id, Hash[kafka_config] @lock = Mutex.new @messages = [] @@ -91,11 +92,11 @@ payload = JSON::generate(event) @messages << Poseidon::MessageToSend.new(event[:type].to_s, payload) @statz.inc :num_output - if @statz.get(:num_output) % @flush_size == 0 - @kafka.send_messages @messages unless @messages.empty? + if @messages.size >= @flush_size + @kafka.send_messages @messages log.debug \ event: 'flush', num_messages: @messages.size @messages = [] end \ No newline at end of file