lib/racecar/consumer_set.rb in racecar-2.0.0.beta1 vs lib/racecar/consumer_set.rb in racecar-2.0.0.beta2

- old
+ new

@@ -1,11 +1,12 @@ module Racecar class ConsumerSet MAX_POLL_TRIES = 10 - def initialize(config, logger) + def initialize(config, logger, instrumenter = NullInstrumenter) @config, @logger = config, logger + @instrumenter = instrumenter raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty? @consumers = [] @consumer_id_iterator = (0...@config.subscriptions.size).cycle @@ -69,10 +70,12 @@ end def current @consumers[@consumer_id_iterator.peek] ||= begin consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer - consumer.subscribe current_subscription.topic + @instrumenter.instrument('join_group') do + consumer.subscribe current_subscription.topic + end consumer end end def each_subscribed