spec/rdkafka/consumer_spec.rb in rdkafka-0.2.0 vs spec/rdkafka/consumer_spec.rb in rdkafka-0.3.0

- old
+ new

@@ -1,5 +1,68 @@ require "spec_helper" describe Rdkafka::Consumer do + let(:config) { rdkafka_config } + let(:consumer) { config.consumer } + let(:producer) { config.producer } + context "subscription" do + it "should subscribe" do + expect(consumer.subscription).to be_empty + + consumer.subscribe("consume_test_topic") + + expect(consumer.subscription).not_to be_empty + expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic("consume_test_topic") + end + expect(consumer.subscription).to eq expected_subscription + + consumer.unsubscribe + + expect(consumer.subscription).to be_empty + end + end + + describe "committed" do + before do + # Make sure there's a stored offset + report = producer.produce( + topic: "consume_test_topic", + payload: "payload 1", + key: "key 1", + partition: 0 + ).wait + message = wait_for_message( + topic: "consume_test_topic", + delivery_report: report, + config: config + ) + end + + it "should fetch the committed offsets for a specified topic partition list" do + list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic("consume_test_topic", [0, 1, 2]) + end + partitions = consumer.committed(list).to_h["consume_test_topic"] + expect(partitions[0].offset).to be > 0 + expect(partitions[1].offset).to eq -1001 + expect(partitions[2].offset).to eq -1001 + end + end + + describe "watermark offsets" do + it "should return the watermark offsets" do + # Make sure there's a message + producer.produce( + topic: "consume_test_topic", + payload: "payload 1", + key: "key 1", + partition: 0 + ).wait + + low, high = consumer.query_watermark_offsets("consume_test_topic", 0, 5000) + expect(low).to eq 0 + expect(high).to be > 0 + end + end end