lib/racecar/consumer_set.rb in racecar-2.0.0.beta1 vs lib/racecar/consumer_set.rb in racecar-2.0.0.beta2
- old
+ new
@@ -1,11 +1,12 @@
module Racecar
class ConsumerSet
MAX_POLL_TRIES = 10
- def initialize(config, logger)
+ def initialize(config, logger, instrumenter = NullInstrumenter)
@config, @logger = config, logger
+ @instrumenter = instrumenter
raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?
@consumers = []
@consumer_id_iterator = (0...@config.subscriptions.size).cycle
@@ -69,10 +70,12 @@
end
def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer
- consumer.subscribe current_subscription.topic
+ @instrumenter.instrument('join_group') do
+ consumer.subscribe current_subscription.topic
+ end
consumer
end
end
def each_subscribed