spec/rdkafka/consumer_spec.rb in rdkafka-0.16.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.17.0

- old
+ new

@@ -256,9 +256,98 @@ expect(records).to be_nil end end end + describe "#seek_by" do + let(:topic) { "consume_test_topic" } + let(:partition) { 0 } + let(:offset) { 0 } + + it "should raise an error when seeking fails" do + expect(Rdkafka::Bindings).to receive(:rd_kafka_seek).and_return(20) + expect { + consumer.seek_by(topic, partition, offset) + }.to raise_error Rdkafka::RdkafkaError + end + + context "subscription" do + let(:timeout) { 1000 } + + before do + consumer.subscribe(topic) + + # 1. partitions are assigned + wait_for_assignment(consumer) + expect(consumer.assignment).not_to be_empty + + # 2. eat unrelated messages + while(consumer.poll(timeout)) do; end + end + after { consumer.unsubscribe } + + def send_one_message(val) + producer.produce( + topic: topic, + payload: "payload #{val}", + key: "key 1", + partition: 0 + ).wait + end + + it "works when a partition is paused" do + # 3. get reference message + send_one_message(:a) + message1 = consumer.poll(timeout) + expect(message1&.payload).to eq "payload a" + + # 4. pause the subscription + tpl = Rdkafka::Consumer::TopicPartitionList.new + tpl.add_topic(topic, 1) + consumer.pause(tpl) + + # 5. seek by the previous message fields + consumer.seek_by(message1.topic, message1.partition, message1.offset) + + # 6. resume the subscription + tpl = Rdkafka::Consumer::TopicPartitionList.new + tpl.add_topic(topic, 1) + consumer.resume(tpl) + + # 7. ensure same message is read again + message2 = consumer.poll(timeout) + + # This is needed because `enable.auto.offset.store` is true but when running in CI that + # is overloaded, offset store lags + sleep(2) + + consumer.commit + expect(message1.offset).to eq message2.offset + expect(message1.payload).to eq message2.payload + end + + it "allows skipping messages" do + # 3. send messages + send_one_message(:a) + send_one_message(:b) + send_one_message(:c) + + # 4. get reference message + message = consumer.poll(timeout) + expect(message&.payload).to eq "payload a" + + # 5. seek over one message + consumer.seek_by(message.topic, message.partition, message.offset + 2) + + # 6. ensure that only one message is available + records = consumer.poll(timeout) + expect(records&.payload).to eq "payload c" + records = consumer.poll(timeout) + expect(records).to be_nil + end + end + end + describe "#assign and #assignment" do it "should return an empty assignment if nothing is assigned" do expect(consumer.assignment).to be_empty end