spec/rdkafka/consumer_spec.rb in rdkafka-0.14.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.15.0

- old
+ new

@@ -52,10 +52,34 @@ expect { consumer.subscription }.to raise_error(Rdkafka::RdkafkaError) end + + context "when using consumer without the poll set" do + let(:consumer) do + config = rdkafka_consumer_config + config.consumer_poll_set = false + config.consumer + end + + it "should subscribe, unsubscribe and return the subscription" do + expect(consumer.subscription).to be_empty + + consumer.subscribe("consume_test_topic") + + expect(consumer.subscription).not_to be_empty + expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic("consume_test_topic") + end + expect(consumer.subscription).to eq expected_subscription + + consumer.unsubscribe + + expect(consumer.subscription).to be_empty + end + end end describe "#pause and #resume" do context "subscription" do let(:timeout) { 2000 } @@ -271,10 +295,32 @@ consumer.assignment }.to raise_error Rdkafka::RdkafkaError end end + describe '#assignment_lost?' do + it "should not return true as we do have an assignment" do + consumer.subscribe("consume_test_topic") + expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic("consume_test_topic") + end + + expect(consumer.assignment_lost?).to eq false + consumer.unsubscribe + end + + it "should not return true after voluntary unsubscribing" do + consumer.subscribe("consume_test_topic") + expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic("consume_test_topic") + end + + consumer.unsubscribe + expect(consumer.assignment_lost?).to eq false + end + end + describe "#close" do it "should close a consumer" do consumer.subscribe("consume_test_topic") 100.times do |i| producer.produce( @@ -1049,9 +1095,32 @@ tpl_response = consumer.offsets_for_times(tpl) expect(tpl_response.to_h["consume_test_topic"][0].offset).to eq message.offset end + end + end + + # Only relevant in case of a consumer with separate queues + describe '#events_poll' do + let(:stats) { [] } + + before { Rdkafka::Config.statistics_callback = ->(published) { stats << published } } + + after { Rdkafka::Config.statistics_callback = nil } + + let(:consumer) do + config = rdkafka_consumer_config('statistics.interval.ms': 100) + config.consumer_poll_set = false + config.consumer + end + + it "expect to run events_poll, operate and propagate stats on events_poll and not poll" do + consumer.subscribe("consume_test_topic") + consumer.poll(1_000) + expect(stats).to be_empty + consumer.events_poll(-1) + expect(stats).not_to be_empty end end describe "a rebalance listener" do let(:consumer) do