Sha256: feb84e45de536850f84cb78bccc081fb3f4a0f9e8cfca4d1543436a62171d291

Contents?: true

Size: 1.38 KB

Versions: 4

Compression:

Stored size: 1.38 KB

Contents

RSpec.describe "Auditing pipeline" do # rubocop:disable RSpec/DescribeClass
  let(:topic) { TestTopic.new }
  let(:config) do
    NulogyMessageBusConsumer::Config.new(
      consumer_group_id: random_consumer_group,
      bootstrap_servers: test_bootstrap_servers,
      topic_name: topic.topic_name
    )
  end
  let(:logger) { spy }

  after { topic.close }

  context "when some messages have not been processed" do
    it "logs the list of unprocessed messages" do
      produce_message(id: uuid(1))
      process_message(id: uuid(1))
      produce_message(id: uuid(2))

      expect(logger).to receive(:warn).with(include_json(
        event: "unprocessed_message",
        kafka_message: {id: uuid(2)}
      ))

      run_audit_pipeline
    end
  end

  context "when all messages have been processed" do
    it "does not log anything" do
      produce_message(id: uuid(1))
      process_message(id: uuid(1))

      run_audit_pipeline

      expect(logger).to have_not_received(:warn)
    end
  end

  def run_audit_pipeline
    NulogyMessageBusConsumer
      .consumer_audit_pipeline(config: config, logger: logger)
      .invoke
  end

  def produce_message(id:)
    topic.produce_one_message(
      payload: JSON.dump(id: id)
    )
  end

  def process_message(id:)
    NulogyMessageBusConsumer::ProcessedMessage.create!(id: id)
  end

  def uuid(id)
    format("00000000-0000-0000-0000-%012d", id)
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 spec/integration/nulogy_message_bus_consumer/auditor_spec.rb
nulogy_message_bus_consumer-2.0.0 spec/integration/nulogy_message_bus_consumer/auditor_spec.rb
nulogy_message_bus_consumer-1.0.0 spec/integration/nulogy_message_bus_consumer/auditor_spec.rb
nulogy_message_bus_consumer-0.5.0 spec/integration/nulogy_message_bus_consumer/auditor_spec.rb