lib/racecar/runner.rb in racecar-2.8.2 vs lib/racecar/runner.rb in racecar-2.9.0.beta1
- old
+ new
@@ -65,10 +65,12 @@
# Main loop
loop do
break if @stop_requested
resume_paused_partitions
+
+ @instrumenter.instrument("start_main_loop", instrumentation_payload)
@instrumenter.instrument("main_loop", instrumentation_payload) do
case process_method
when :batch then
msg_per_part = consumer.batch_poll(config.max_wait_time_ms).group_by(&:partition)
msg_per_part.each_value do |messages|
@@ -92,10 +94,11 @@
end
end
ensure
producer.close
Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog")
+ @instrumenter.instrument("shut_down", instrumentation_payload || {})
end
def stop
@stop_requested = true
end
@@ -147,10 +150,12 @@
producer_config = {
"bootstrap.servers" => config.brokers.join(","),
"client.id" => config.client_id,
"statistics.interval.ms" => config.statistics_interval_ms,
"message.timeout.ms" => config.message_timeout * 1000,
+ "partitioner" => config.partitioner.to_s,
}
+
producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil?
producer_config.merge!(config.rdkafka_producer)
producer_config
end