Sha256: ca7d16fe4390ed13b97df676291c65bc01e00075cc9587c7645f4c7987484fb9

Contents?: true

Size: 1.05 KB

Versions: 4

Compression:

Stored size: 1.05 KB

Contents

module NulogyMessageBusConsumer
  module Tasks
    RSpec.describe SuperviseConsumerLag do
      let(:killable) { spy }
      let(:logger) { spy }
      let(:tracker) { LagTracker.new(failing_checks: 2) }
      let(:task) { described_class.new(logger, tracker: tracker, killable: killable) }

      it "kills the main thread after lag does not change" do
        lag = {
          "partition-1" => 2,
          "partition-2" => 3
        }
        tracker.update({"topic" => lag})
        tracker.update({"topic" => lag})
        consumer = instance_double(
          Rdkafka::Consumer,
          committed: nil,
          lag: {"topic" => lag}
        )
        # skip waiting for assignment
        allow(KafkaUtils).to receive(:wait_for_assignment).with(consumer).and_return(nil)
        allow(Thread.current).to receive(:exit)
        task.extract_args(kafka_consumer: consumer)

        task.call

        expect(logger).to have_received(:warn).with(/Assigned partition lag has not changed/)
        expect(killable).to have_received(:kill)
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 spec/integration/nulogy_message_bus_consumer/tasks/supervise_consumer_lag_spec.rb
nulogy_message_bus_consumer-2.0.1 spec/integration/nulogy_message_bus_consumer/tasks/supervise_consumer_lag_spec.rb
nulogy_message_bus_consumer-2.0.0 spec/integration/nulogy_message_bus_consumer/tasks/supervise_consumer_lag_spec.rb
nulogy_message_bus_consumer-1.0.0 spec/integration/nulogy_message_bus_consumer/tasks/supervise_consumer_lag_spec.rb