Sha256: 1676d0f53debd8d3769f649522edb48cb79cee01936b7038bc3d6c38db74dae3
Contents?: true
Size: 1.21 KB
Versions: 5
Compression:
Stored size: 1.21 KB
Contents
RSpec.describe NulogyMessageBusConsumer::Steps::StreamMessages do subject(:step) { NulogyMessageBusConsumer::Steps::StreamMessages.new(logger) } let(:config) { NulogyMessageBusConsumer::Config.new } let(:logger) { spy } it "logs errors while streaming from kafka" do kafka_consumer = instance_double("Rdkafka::Consumer") allow(kafka_consumer).to receive(:subscribe) allow(kafka_consumer).to receive(:unsubscribe) allow(kafka_consumer).to receive(:each).and_raise(StandardError, "streaming failed") expect(logger).to receive(:error).with(include_json( event: "message_processing_errored", class: "StandardError", message: "streaming failed" )) expect { step.call(kafka_consumer: kafka_consumer) }.to raise_error(StandardError) end context "when handler returns failure" do it "does not commit the message in kafka" do kafka_consumer = spy allow(kafka_consumer).to receive(:each).and_yield(anything) allow(NulogyMessageBusConsumer::Message).to receive(:from_kafka).and_return(NulogyMessageBusConsumer::Message.new) expect(kafka_consumer).not_to receive(:commit) step.call(kafka_consumer: kafka_consumer) { :failure } end end end
Version data entries
5 entries across 5 versions & 1 rubygems