Sha256: 1676d0f53debd8d3769f649522edb48cb79cee01936b7038bc3d6c38db74dae3

Contents?: true

Size: 1.21 KB

Versions: 5

Compression:

Stored size: 1.21 KB

Contents

RSpec.describe NulogyMessageBusConsumer::Steps::StreamMessages do
  subject(:step) { NulogyMessageBusConsumer::Steps::StreamMessages.new(logger) }

  let(:config) { NulogyMessageBusConsumer::Config.new }
  let(:logger) { spy }

  it "logs errors while streaming from kafka" do
    kafka_consumer = instance_double("Rdkafka::Consumer")
    allow(kafka_consumer).to receive(:subscribe)
    allow(kafka_consumer).to receive(:unsubscribe)
    allow(kafka_consumer).to receive(:each).and_raise(StandardError, "streaming failed")

    expect(logger).to receive(:error).with(include_json(
      event: "message_processing_errored",
      class: "StandardError",
      message: "streaming failed"
    ))

    expect {
      step.call(kafka_consumer: kafka_consumer)
    }.to raise_error(StandardError)
  end

  context "when handler returns failure" do
    it "does not commit the message in kafka" do
      kafka_consumer = spy
      allow(kafka_consumer).to receive(:each).and_yield(anything)
      allow(NulogyMessageBusConsumer::Message).to receive(:from_kafka).and_return(NulogyMessageBusConsumer::Message.new)

      expect(kafka_consumer).not_to receive(:commit)

      step.call(kafka_consumer: kafka_consumer) { :failure }
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 spec/unit/nulogy_message_bus_consumer/steps/stream_messages_spec.rb
nulogy_message_bus_consumer-2.0.1 spec/unit/nulogy_message_bus_consumer/steps/stream_messages_spec.rb
nulogy_message_bus_consumer-2.0.0 spec/unit/nulogy_message_bus_consumer/steps/stream_messages_spec.rb
nulogy_message_bus_consumer-1.0.0 spec/unit/nulogy_message_bus_consumer/steps/stream_messages_spec.rb
nulogy_message_bus_consumer-0.5.0 spec/unit/nulogy_message_bus_consumer/steps/stream_messages_spec.rb