spec/rdkafka/consumer_spec.rb in rdkafka-0.14.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.15.0
- old
+ new
@@ -52,10 +52,34 @@
expect {
consumer.subscription
}.to raise_error(Rdkafka::RdkafkaError)
end
+
+ context "when using consumer without the poll set" do
+ let(:consumer) do
+ config = rdkafka_consumer_config
+ config.consumer_poll_set = false
+ config.consumer
+ end
+
+ it "should subscribe, unsubscribe and return the subscription" do
+ expect(consumer.subscription).to be_empty
+
+ consumer.subscribe("consume_test_topic")
+
+ expect(consumer.subscription).not_to be_empty
+ expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
+ list.add_topic("consume_test_topic")
+ end
+ expect(consumer.subscription).to eq expected_subscription
+
+ consumer.unsubscribe
+
+ expect(consumer.subscription).to be_empty
+ end
+ end
end
describe "#pause and #resume" do
context "subscription" do
let(:timeout) { 2000 }
@@ -271,10 +295,32 @@
consumer.assignment
}.to raise_error Rdkafka::RdkafkaError
end
end
+ describe '#assignment_lost?' do
+ it "should not return true as we do have an assignment" do
+ consumer.subscribe("consume_test_topic")
+ expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
+ list.add_topic("consume_test_topic")
+ end
+
+ expect(consumer.assignment_lost?).to eq false
+ consumer.unsubscribe
+ end
+
+ it "should not return true after voluntary unsubscribing" do
+ consumer.subscribe("consume_test_topic")
+ expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
+ list.add_topic("consume_test_topic")
+ end
+
+ consumer.unsubscribe
+ expect(consumer.assignment_lost?).to eq false
+ end
+ end
+
describe "#close" do
it "should close a consumer" do
consumer.subscribe("consume_test_topic")
100.times do |i|
producer.produce(
@@ -1049,9 +1095,32 @@
tpl_response = consumer.offsets_for_times(tpl)
expect(tpl_response.to_h["consume_test_topic"][0].offset).to eq message.offset
end
+ end
+ end
+
+ # Only relevant in case of a consumer with separate queues
+ describe '#events_poll' do
+ let(:stats) { [] }
+
+ before { Rdkafka::Config.statistics_callback = ->(published) { stats << published } }
+
+ after { Rdkafka::Config.statistics_callback = nil }
+
+ let(:consumer) do
+ config = rdkafka_consumer_config('statistics.interval.ms': 100)
+ config.consumer_poll_set = false
+ config.consumer
+ end
+
+ it "expect to run events_poll, operate and propagate stats on events_poll and not poll" do
+ consumer.subscribe("consume_test_topic")
+ consumer.poll(1_000)
+ expect(stats).to be_empty
+ consumer.events_poll(-1)
+ expect(stats).not_to be_empty
end
end
describe "a rebalance listener" do
let(:consumer) do