spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-7.3.1 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-7.3.2

- old
+ new

@@ -47,11 +47,11 @@ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register kafka.multi_receive([event]) end - + it 'should raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) end end @@ -118,10 +118,45 @@ kafka.register kafka.multi_receive([event]) end end + context 'when retries is 0' do + let(:retries) { 0 } + let(:max_sends) { 1 } + + it "should should only send once" do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should not sleep' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).not_to receive(:sleep).with(anything) + kafka.register + kafka.multi_receive([event]) + end + end + context "and when retries is set by the user" do let(:retries) { (rand * 10).to_i } let(:max_sends) { retries + 1 } it "should give up after retries are exhausted" do @@ -132,9 +167,24 @@ future = java.util.concurrent.FutureTask.new { raise "Failed" } future.run future end kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should only sleep retries number of times' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .at_most(max_sends) + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).to receive(:sleep).exactly(retries).times kafka.register kafka.multi_receive([event]) end end end