Sha256: ca7d16fe4390ed13b97df676291c65bc01e00075cc9587c7645f4c7987484fb9
Contents?: true
Size: 1.05 KB
Versions: 4
Compression:
Stored size: 1.05 KB
Contents
module NulogyMessageBusConsumer module Tasks RSpec.describe SuperviseConsumerLag do let(:killable) { spy } let(:logger) { spy } let(:tracker) { LagTracker.new(failing_checks: 2) } let(:task) { described_class.new(logger, tracker: tracker, killable: killable) } it "kills the main thread after lag does not change" do lag = { "partition-1" => 2, "partition-2" => 3 } tracker.update({"topic" => lag}) tracker.update({"topic" => lag}) consumer = instance_double( Rdkafka::Consumer, committed: nil, lag: {"topic" => lag} ) # skip waiting for assignment allow(KafkaUtils).to receive(:wait_for_assignment).with(consumer).and_return(nil) allow(Thread.current).to receive(:exit) task.extract_args(kafka_consumer: consumer) task.call expect(logger).to have_received(:warn).with(/Assigned partition lag has not changed/) expect(killable).to have_received(:kill) end end end end
Version data entries
4 entries across 4 versions & 1 rubygems