spec/rdkafka/consumer_spec.rb in rdkafka-0.16.1 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.17.0
- old
+ new
@@ -256,9 +256,98 @@
expect(records).to be_nil
end
end
end
+ describe "#seek_by" do
+ let(:topic) { "consume_test_topic" }
+ let(:partition) { 0 }
+ let(:offset) { 0 }
+
+ it "should raise an error when seeking fails" do
+ expect(Rdkafka::Bindings).to receive(:rd_kafka_seek).and_return(20)
+ expect {
+ consumer.seek_by(topic, partition, offset)
+ }.to raise_error Rdkafka::RdkafkaError
+ end
+
+ context "subscription" do
+ let(:timeout) { 1000 }
+
+ before do
+ consumer.subscribe(topic)
+
+ # 1. partitions are assigned
+ wait_for_assignment(consumer)
+ expect(consumer.assignment).not_to be_empty
+
+ # 2. eat unrelated messages
+ while(consumer.poll(timeout)) do; end
+ end
+ after { consumer.unsubscribe }
+
+ def send_one_message(val)
+ producer.produce(
+ topic: topic,
+ payload: "payload #{val}",
+ key: "key 1",
+ partition: 0
+ ).wait
+ end
+
+ it "works when a partition is paused" do
+ # 3. get reference message
+ send_one_message(:a)
+ message1 = consumer.poll(timeout)
+ expect(message1&.payload).to eq "payload a"
+
+ # 4. pause the subscription
+ tpl = Rdkafka::Consumer::TopicPartitionList.new
+ tpl.add_topic(topic, 1)
+ consumer.pause(tpl)
+
+ # 5. seek by the previous message fields
+ consumer.seek_by(message1.topic, message1.partition, message1.offset)
+
+ # 6. resume the subscription
+ tpl = Rdkafka::Consumer::TopicPartitionList.new
+ tpl.add_topic(topic, 1)
+ consumer.resume(tpl)
+
+ # 7. ensure same message is read again
+ message2 = consumer.poll(timeout)
+
+ # This is needed because `enable.auto.offset.store` is true but when running in CI that
+ # is overloaded, offset store lags
+ sleep(2)
+
+ consumer.commit
+ expect(message1.offset).to eq message2.offset
+ expect(message1.payload).to eq message2.payload
+ end
+
+ it "allows skipping messages" do
+ # 3. send messages
+ send_one_message(:a)
+ send_one_message(:b)
+ send_one_message(:c)
+
+ # 4. get reference message
+ message = consumer.poll(timeout)
+ expect(message&.payload).to eq "payload a"
+
+ # 5. seek over one message
+ consumer.seek_by(message.topic, message.partition, message.offset + 2)
+
+ # 6. ensure that only one message is available
+ records = consumer.poll(timeout)
+ expect(records&.payload).to eq "payload c"
+ records = consumer.poll(timeout)
+ expect(records).to be_nil
+ end
+ end
+ end
+
describe "#assign and #assignment" do
it "should return an empty assignment if nothing is assigned" do
expect(consumer.assignment).to be_empty
end