lib/racecar/consumer_set.rb in racecar-2.9.0.beta1 vs lib/racecar/consumer_set.rb in racecar-2.9.0
- old
+ new
@@ -67,10 +67,13 @@
each_subscribed(&:close)
end
def current
@consumers[@consumer_id_iterator.peek] ||= begin
- consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer
+ consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
+ consumer_config.consumer_rebalance_listener = @config.rebalance_listener
+
+ consumer = consumer_config.consumer
@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end
consumer
end