lib/franz/output/kafka.rb in franz-2.0.1 vs lib/franz/output/kafka.rb in franz-2.0.2
- old
+ new
@@ -23,10 +23,11 @@
opts = {
logger: Logger.new(STDOUT),
tags: [],
input: [],
output: {
+ topic: 'franz',
flush_interval: 10,
flush_size: 500,
client_id: @@host,
type: 'sync',
compression_codec: 'snappy',
@@ -47,10 +48,11 @@
@stop = false
@foreground = opts[:foreground]
@flush_size = opts[:output].delete :flush_size
@flush_interval = opts[:output].delete :flush_interval
+ @topic = opts[:output].delete :topic
kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ]
kafka_client_id = opts[:output].delete :client_id
kafka_config = opts[:output].map { |k,v|
[ k, v.is_a?(String) ? v.to_sym : v ]
@@ -65,19 +67,19 @@
@messages = []
@thread = Thread.new do
loop do
- ready_messages = []
@lock.synchronize do
ready_messages = @messages
@messages = []
+ @kafka.send_messages ready_messages unless ready_messages.empty?
+ log.debug \
+ event: 'periodic flush',
+ num_messages: ready_messages.size
end
- @kafka.send_messages ready_messages unless ready_messages.empty?
- log.debug \
- event: 'periodic flush',
- num_messages: ready_messages.size
+
sleep @flush_interval
end
end
@@ -88,19 +90,22 @@
log.trace \
event: 'publish',
raw: event
payload = JSON::generate(event)
- @messages << Poseidon::MessageToSend.new(event[:type].to_s, payload)
- @statz.inc :num_output
+ @lock.synchronize do
+ @messages << Poseidon::MessageToSend.new(@topic, payload)
- if @messages.size >= @flush_size
- @kafka.send_messages @messages
- log.debug \
- event: 'flush',
- num_messages: @messages.size
- @messages = []
+ @statz.inc :num_output
+
+ if @messages.size >= @flush_size
+ @kafka.send_messages @messages
+ log.debug \
+ event: 'flush',
+ num_messages: @messages.size
+ @messages = []
+ end
end
end
end
\ No newline at end of file