Sha256: c09980773e7c834426c112a0f89c86747e3d2e937cb2c00b787d3c005980ee77
Contents?: true
Size: 1.24 KB
Versions: 2
Compression:
Stored size: 1.24 KB
Contents
RSpec.describe NulogyMessageBusConsumer::KafkaUtils do subject(:utils) { NulogyMessageBusConsumer::KafkaUtils } let(:topic) { TestTopic.new } after { topic.close } describe "#seek_beginning" do it "updates the consumer offset to the beginning of the topic" do topic.produce_one_message(payload: "First Message") expect(topic.consume_one_message).to have_attributes(payload: "First Message") expect(topic.consume_one_message).to eq(nil) utils.seek_beginning(topic.consumer) expect(topic.consume_one_message).to have_attributes(payload: "First Message") end end describe "#seek_end" do it "updates the consumer offset to the end of the topic" do topic.produce_one_message(payload: "First Message") utils.seek_ending(topic.consumer) expect(topic.consume_one_message).to eq(nil) end end describe "#every_message_until_none_are_left" do it "does not keep the connection open when there are no messages" do topic.produce_one_message(payload: "The Only Message") enum = utils.every_message_until_none_are_left(topic.consumer, Kafka::CONSUMER_POLL_TIMEOUT) expect(enum).to match([ have_attributes(payload: "The Only Message") ]) end end end
Version data entries
2 entries across 2 versions & 1 rubygems