Sha256: feb84e45de536850f84cb78bccc081fb3f4a0f9e8cfca4d1543436a62171d291
Contents?: true
Size: 1.38 KB
Versions: 4
Compression:
Stored size: 1.38 KB
Contents
RSpec.describe "Auditing pipeline" do # rubocop:disable RSpec/DescribeClass let(:topic) { TestTopic.new } let(:config) do NulogyMessageBusConsumer::Config.new( consumer_group_id: random_consumer_group, bootstrap_servers: test_bootstrap_servers, topic_name: topic.topic_name ) end let(:logger) { spy } after { topic.close } context "when some messages have not been processed" do it "logs the list of unprocessed messages" do produce_message(id: uuid(1)) process_message(id: uuid(1)) produce_message(id: uuid(2)) expect(logger).to receive(:warn).with(include_json( event: "unprocessed_message", kafka_message: {id: uuid(2)} )) run_audit_pipeline end end context "when all messages have been processed" do it "does not log anything" do produce_message(id: uuid(1)) process_message(id: uuid(1)) run_audit_pipeline expect(logger).to have_not_received(:warn) end end def run_audit_pipeline NulogyMessageBusConsumer .consumer_audit_pipeline(config: config, logger: logger) .invoke end def produce_message(id:) topic.produce_one_message( payload: JSON.dump(id: id) ) end def process_message(id:) NulogyMessageBusConsumer::ProcessedMessage.create!(id: id) end def uuid(id) format("00000000-0000-0000-0000-%012d", id) end end
Version data entries
4 entries across 4 versions & 1 rubygems