Sha256: c09980773e7c834426c112a0f89c86747e3d2e937cb2c00b787d3c005980ee77

Contents?: true

Size: 1.24 KB

Versions: 2

Compression:

Stored size: 1.24 KB

Contents

RSpec.describe NulogyMessageBusConsumer::KafkaUtils do
  subject(:utils) { NulogyMessageBusConsumer::KafkaUtils }

  let(:topic) { TestTopic.new }

  after { topic.close }

  describe "#seek_beginning" do
    it "updates the consumer offset to the beginning of the topic" do
      topic.produce_one_message(payload: "First Message")
      expect(topic.consume_one_message).to have_attributes(payload: "First Message")
      expect(topic.consume_one_message).to eq(nil)

      utils.seek_beginning(topic.consumer)

      expect(topic.consume_one_message).to have_attributes(payload: "First Message")
    end
  end

  describe "#seek_end" do
    it "updates the consumer offset to the end of the topic" do
      topic.produce_one_message(payload: "First Message")

      utils.seek_ending(topic.consumer)

      expect(topic.consume_one_message).to eq(nil)
    end
  end

  describe "#every_message_until_none_are_left" do
    it "does not keep the connection open when there are no messages" do
      topic.produce_one_message(payload: "The Only Message")

      enum = utils.every_message_until_none_are_left(topic.consumer, Kafka::CONSUMER_POLL_TIMEOUT)

      expect(enum).to match([
        have_attributes(payload: "The Only Message")
      ])
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 spec/integration/nulogy_message_bus_consumer/kafka_utils_spec.rb
nulogy_message_bus_consumer-2.0.1 spec/integration/nulogy_message_bus_consumer/kafka_utils_spec.rb