spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-6.2.4 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-7.0.0

- old
+ new

@@ -23,170 +23,36 @@ end context 'when outputting messages' do it 'should send logstash event to kafka broker' do expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)).and_call_original + .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) kafka.register - kafka.multi_receive([event]) + kafka.receive(event) end it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("my_topic", event.to_s).and_call_original - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original + .with("my_topic", event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) kafka.register - kafka.multi_receive([event]) + kafka.receive(event) end it 'should support field referenced message_keys' do expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) - .with("test", "172.0.0.1", event.to_s).and_call_original - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original + .with("test", "172.0.0.1", event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register - kafka.multi_receive([event]) + kafka.receive(event) end - + it 'should raise config error when truststore location is not set and ssl is enabled' do - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"})) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) - end - end - - context "when KafkaProducer#send() raises an exception" do - let(:failcount) { (rand * 10).to_i } - let(:sendcount) { failcount + 1 } - - let(:exception_classes) { [ - org.apache.kafka.common.errors.TimeoutException, - org.apache.kafka.common.errors.InterruptException, - org.apache.kafka.common.errors.SerializationException - ] } - - before do - count = 0 - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .exactly(sendcount).times - .and_wrap_original do |m, *args| - if count < failcount # fail 'failcount' times in a row. - count += 1 - # Pick an exception at random - raise exception_classes.shuffle.first.new("injected exception for testing") - else - m.call(*args) # call original - end - end - end - - it "should retry until successful" do - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) - kafka.register - kafka.multi_receive([event]) - end - end - - context "when a send fails" do - context "and the default retries behavior is used" do - # Fail this many times and then finally succeed. - let(:failcount) { (rand * 10).to_i } - - # Expect KafkaProducer.send() to get called again after every failure, plus the successful one. - let(:sendcount) { failcount + 1 } - - it "should retry until successful" do - count = 0; - - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .exactly(sendcount).times - .and_wrap_original do |m, *args| - if count < failcount - count += 1 - # inject some failures. - - # Return a custom Future that will raise an exception to simulate a Kafka send() problem. - future = java.util.concurrent.FutureTask.new { raise "Failed" } - future.run - future - else - m.call(*args) - end - end - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) - kafka.register - kafka.multi_receive([event]) - end - end - - context 'when retries is 0' do - let(:retries) { 0 } - let(:max_sends) { 1 } - - it "should should only send once" do - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .once - .and_wrap_original do |m, *args| - # Always fail. - future = java.util.concurrent.FutureTask.new { raise "Failed" } - future.run - future - end - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) - kafka.register - kafka.multi_receive([event]) - end - - it 'should not sleep' do - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .once - .and_wrap_original do |m, *args| - # Always fail. - future = java.util.concurrent.FutureTask.new { raise "Failed" } - future.run - future - end - - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) - expect(kafka).not_to receive(:sleep).with(anything) - kafka.register - kafka.multi_receive([event]) - end - end - - context "and when retries is set by the user" do - let(:retries) { (rand * 10).to_i } - let(:max_sends) { retries + 1 } - - it "should give up after retries are exhausted" do - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .at_most(max_sends).times - .and_wrap_original do |m, *args| - # Always fail. - future = java.util.concurrent.FutureTask.new { raise "Failed" } - future.run - future - end - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) - kafka.register - kafka.multi_receive([event]) - end - - it 'should only sleep retries number of times' do - expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) - .at_most(max_sends) - .and_wrap_original do |m, *args| - # Always fail. - future = java.util.concurrent.FutureTask.new { raise "Failed" } - future.run - future - end - kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) - expect(kafka).to receive(:sleep).exactly(retries).times - kafka.register - kafka.multi_receive([event]) - end end end end