lib/racecar/runner.rb in racecar-2.9.0.beta1 vs lib/racecar/runner.rb in racecar-2.9.0
- old
+ new
@@ -3,10 +3,11 @@
require "rdkafka"
require "racecar/pause"
require "racecar/message"
require "racecar/message_delivery_error"
require "racecar/erroneous_state_error"
+require "racecar/delivery_callback"
module Racecar
class Runner
attr_reader :processor, :config, :logger
@@ -139,11 +140,11 @@
end
end
def producer
@producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer|
- producer.delivery_callback = delivery_callback
+ producer.delivery_callback = Racecar::DeliveryCallback.new(instrumenter: @instrumenter)
end
end
def producer_config
# https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
@@ -156,19 +157,9 @@
}
producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil?
producer_config.merge!(config.rdkafka_producer)
producer_config
- end
-
- def delivery_callback
- ->(delivery_report) do
- payload = {
- offset: delivery_report.offset,
- partition: delivery_report.partition
- }
- @instrumenter.instrument("acknowledged_message", payload)
- end
end
def install_signal_handlers
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
trap("QUIT") { stop }