lib/racecar/runner.rb in racecar-2.8.2 vs lib/racecar/runner.rb in racecar-2.9.0.beta1

- old
+ new

@@ -65,10 +65,12 @@ # Main loop loop do break if @stop_requested resume_paused_partitions + + @instrumenter.instrument("start_main_loop", instrumentation_payload) @instrumenter.instrument("main_loop", instrumentation_payload) do case process_method when :batch then msg_per_part = consumer.batch_poll(config.max_wait_time_ms).group_by(&:partition) msg_per_part.each_value do |messages| @@ -92,10 +94,11 @@ end end ensure producer.close Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog") + @instrumenter.instrument("shut_down", instrumentation_payload || {}) end def stop @stop_requested = true end @@ -147,10 +150,12 @@ producer_config = { "bootstrap.servers" => config.brokers.join(","), "client.id" => config.client_id, "statistics.interval.ms" => config.statistics_interval_ms, "message.timeout.ms" => config.message_timeout * 1000, + "partitioner" => config.partitioner.to_s, } + producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil? producer_config.merge!(config.rdkafka_producer) producer_config end