lib/racecar/runner.rb in racecar-2.9.0.beta1 vs lib/racecar/runner.rb in racecar-2.9.0

- old
+ new

@@ -3,10 +3,11 @@ require "rdkafka" require "racecar/pause" require "racecar/message" require "racecar/message_delivery_error" require "racecar/erroneous_state_error" +require "racecar/delivery_callback" module Racecar class Runner attr_reader :processor, :config, :logger @@ -139,11 +140,11 @@ end end def producer @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer| - producer.delivery_callback = delivery_callback + producer.delivery_callback = Racecar::DeliveryCallback.new(instrumenter: @instrumenter) end end def producer_config # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md @@ -156,19 +157,9 @@ } producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil? producer_config.merge!(config.rdkafka_producer) producer_config - end - - def delivery_callback - ->(delivery_report) do - payload = { - offset: delivery_report.offset, - partition: delivery_report.partition - } - @instrumenter.instrument("acknowledged_message", payload) - end end def install_signal_handlers # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. trap("QUIT") { stop }