lib/franz/output/kafka.rb in franz-2.1.6 vs lib/franz/output/kafka.rb in franz-2.1.7
- old
+ new
@@ -50,34 +50,29 @@
@flush_size = opts[:output].delete :flush_size
@flush_interval = opts[:output].delete :flush_interval
@topic = opts[:output].delete :topic
- kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ]
- kafka_client_id = opts[:output].delete :client_id
- kafka_config = opts[:output].map { |k,v|
+ @kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ]
+ @kafka_client_id = opts[:output].delete :client_id
+ @kafka_config = opts[:output].map { |k,v|
[ k, v.is_a?(String) ? v.to_sym : v ]
}
- @kafka = Poseidon::Producer.new \
- kafka_brokers,
- kafka_client_id,
- Hash[kafka_config]
+ kafka_connect
@lock = Mutex.new
@messages = []
@thread = Thread.new do
- loop do
+ until @stop
@lock.synchronize do
- ready_messages = @messages
- @messages = []
- @kafka.send_messages ready_messages unless ready_messages.empty?
+ num_messages = kafka_send @messages
log.debug \
event: 'periodic flush',
- num_messages: ready_messages.size
+ num_messages: num_messages
end
sleep @flush_interval
end
end
@@ -94,18 +89,15 @@
payload = JSON::generate(event)
@lock.synchronize do
@messages << Poseidon::MessageToSend.new(@topic, payload)
- @statz.inc :num_output
-
if @messages.size >= @flush_size
- @kafka.send_messages @messages
+ num_messages = kafka_send @messages
log.debug \
event: 'flush',
- num_messages: @messages.size
- @messages = []
+ num_messages: num_messages
end
end
end
end
@@ -133,9 +125,30 @@
end
private
def log ; @logger end
+
+ def kafka_connect
+ @kafka = Poseidon::Producer.new \
+ @kafka_brokers,
+ @kafka_client_id,
+ Hash[@kafka_config]
+ end
+
+ def kafka_send messages
+ return 0 if @messages.empty?
+ @kafka.send_messages @messages
+ @statz.inc :num_output, @messages.length
+ size = @messages.size
+ @messages = []
+ return size
+ rescue Poseidon::Errors::UnableToFetchMetadata
+ log.warn event: 'output dropped'
+ kafka_connect
+ sleep 1
+ retry
+ end
end
end
end
\ No newline at end of file