lib/franz/output/kafka.rb in franz-2.1.6 vs lib/franz/output/kafka.rb in franz-2.1.7

- old
+ new

@@ -50,34 +50,29 @@ @flush_size = opts[:output].delete :flush_size @flush_interval = opts[:output].delete :flush_interval @topic = opts[:output].delete :topic - kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] - kafka_client_id = opts[:output].delete :client_id - kafka_config = opts[:output].map { |k,v| + @kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] + @kafka_client_id = opts[:output].delete :client_id + @kafka_config = opts[:output].map { |k,v| [ k, v.is_a?(String) ? v.to_sym : v ] } - @kafka = Poseidon::Producer.new \ - kafka_brokers, - kafka_client_id, - Hash[kafka_config] + kafka_connect @lock = Mutex.new @messages = [] @thread = Thread.new do - loop do + until @stop @lock.synchronize do - ready_messages = @messages - @messages = [] - @kafka.send_messages ready_messages unless ready_messages.empty? + num_messages = kafka_send @messages log.debug \ event: 'periodic flush', - num_messages: ready_messages.size + num_messages: num_messages end sleep @flush_interval end end @@ -94,18 +89,15 @@ payload = JSON::generate(event) @lock.synchronize do @messages << Poseidon::MessageToSend.new(@topic, payload) - @statz.inc :num_output - if @messages.size >= @flush_size - @kafka.send_messages @messages + num_messages = kafka_send @messages log.debug \ event: 'flush', - num_messages: @messages.size - @messages = [] + num_messages: num_messages end end end end @@ -133,9 +125,30 @@ end private def log ; @logger end + + def kafka_connect + @kafka = Poseidon::Producer.new \ + @kafka_brokers, + @kafka_client_id, + Hash[@kafka_config] + end + + def kafka_send messages + return 0 if @messages.empty? + @kafka.send_messages @messages + @statz.inc :num_output, @messages.length + size = @messages.size + @messages = [] + return size + rescue Poseidon::Errors::UnableToFetchMetadata + log.warn event: 'output dropped' + kafka_connect + sleep 1 + retry + end end end end \ No newline at end of file