lib/franz/output/kafka.rb in franz-2.0.1 vs lib/franz/output/kafka.rb in franz-2.0.2

- old
+ new

@@ -23,10 +23,11 @@ opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { + topic: 'franz', flush_interval: 10, flush_size: 500, client_id: @@host, type: 'sync', compression_codec: 'snappy', @@ -47,10 +48,11 @@ @stop = false @foreground = opts[:foreground] @flush_size = opts[:output].delete :flush_size @flush_interval = opts[:output].delete :flush_interval + @topic = opts[:output].delete :topic kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] kafka_client_id = opts[:output].delete :client_id kafka_config = opts[:output].map { |k,v| [ k, v.is_a?(String) ? v.to_sym : v ] @@ -65,19 +67,19 @@ @messages = [] @thread = Thread.new do loop do - ready_messages = [] @lock.synchronize do ready_messages = @messages @messages = [] + @kafka.send_messages ready_messages unless ready_messages.empty? + log.debug \ + event: 'periodic flush', + num_messages: ready_messages.size end - @kafka.send_messages ready_messages unless ready_messages.empty? - log.debug \ - event: 'periodic flush', - num_messages: ready_messages.size + sleep @flush_interval end end @@ -88,19 +90,22 @@ log.trace \ event: 'publish', raw: event payload = JSON::generate(event) - @messages << Poseidon::MessageToSend.new(event[:type].to_s, payload) - @statz.inc :num_output + @lock.synchronize do + @messages << Poseidon::MessageToSend.new(@topic, payload) - if @messages.size >= @flush_size - @kafka.send_messages @messages - log.debug \ - event: 'flush', - num_messages: @messages.size - @messages = [] + @statz.inc :num_output + + if @messages.size >= @flush_size + @kafka.send_messages @messages + log.debug \ + event: 'flush', + num_messages: @messages.size + @messages = [] + end end end end \ No newline at end of file