spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-6.2.2 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-6.2.4

- old
+ new

@@ -23,36 +23,170 @@ end context 'when outputting messages' do it 'should send logstash event to kafka broker' do expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)) + .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)).and_call_original kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) kafka.register - kafka.receive(event) + kafka.multi_receive([event]) end it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("my_topic", event.to_s) - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .with("my_topic", event.to_s).and_call_original + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) kafka.register - kafka.receive(event) + kafka.multi_receive([event]) end it 'should support field referenced message_keys' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("test", "172.0.0.1", event.to_s) - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .with("test", "172.0.0.1", event.to_s).and_call_original + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register - kafka.receive(event) + kafka.multi_receive([event]) end - + it 'should raise config error when truststore location is not set and ssl is enabled' do - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"})) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) + end + end + + context "when KafkaProducer#send() raises an exception" do + let(:failcount) { (rand * 10).to_i } + let(:sendcount) { failcount + 1 } + + let(:exception_classes) { [ + org.apache.kafka.common.errors.TimeoutException, + org.apache.kafka.common.errors.InterruptException, + org.apache.kafka.common.errors.SerializationException + ] } + + before do + count = 0 + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .exactly(sendcount).times + .and_wrap_original do |m, *args| + if count < failcount # fail 'failcount' times in a row. + count += 1 + # Pick an exception at random + raise exception_classes.shuffle.first.new("injected exception for testing") + else + m.call(*args) # call original + end + end + end + + it "should retry until successful" do + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) + kafka.register + kafka.multi_receive([event]) + end + end + + context "when a send fails" do + context "and the default retries behavior is used" do + # Fail this many times and then finally succeed. + let(:failcount) { (rand * 10).to_i } + + # Expect KafkaProducer.send() to get called again after every failure, plus the successful one. + let(:sendcount) { failcount + 1 } + + it "should retry until successful" do + count = 0; + + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .exactly(sendcount).times + .and_wrap_original do |m, *args| + if count < failcount + count += 1 + # inject some failures. + + # Return a custom Future that will raise an exception to simulate a Kafka send() problem. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + else + m.call(*args) + end + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) + kafka.register + kafka.multi_receive([event]) + end + end + + context 'when retries is 0' do + let(:retries) { 0 } + let(:max_sends) { 1 } + + it "should should only send once" do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should not sleep' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).not_to receive(:sleep).with(anything) + kafka.register + kafka.multi_receive([event]) + end + end + + context "and when retries is set by the user" do + let(:retries) { (rand * 10).to_i } + let(:max_sends) { retries + 1 } + + it "should give up after retries are exhausted" do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .at_most(max_sends).times + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should only sleep retries number of times' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .at_most(max_sends) + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).to receive(:sleep).exactly(retries).times + kafka.register + kafka.multi_receive([event]) + end end end end