spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-7.3.1 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-7.3.2
- old
+ new
@@ -47,11 +47,11 @@
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
kafka.register
kafka.multi_receive([event])
end
-
+
it 'should raise config error when truststore location is not set and ssl is enabled' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL"))
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
end
end
@@ -118,10 +118,45 @@
kafka.register
kafka.multi_receive([event])
end
end
+ context 'when retries is 0' do
+ let(:retries) { 0 }
+ let(:max_sends) { 1 }
+
+ it "should should only send once" do
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+ .once
+ .and_wrap_original do |m, *args|
+ # Always fail.
+ future = java.util.concurrent.FutureTask.new { raise "Failed" }
+ future.run
+ future
+ end
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
+ kafka.register
+ kafka.multi_receive([event])
+ end
+
+ it 'should not sleep' do
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+ .once
+ .and_wrap_original do |m, *args|
+ # Always fail.
+ future = java.util.concurrent.FutureTask.new { raise "Failed" }
+ future.run
+ future
+ end
+
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
+ expect(kafka).not_to receive(:sleep).with(anything)
+ kafka.register
+ kafka.multi_receive([event])
+ end
+ end
+
context "and when retries is set by the user" do
let(:retries) { (rand * 10).to_i }
let(:max_sends) { retries + 1 }
it "should give up after retries are exhausted" do
@@ -132,9 +167,24 @@
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future.run
future
end
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
+ kafka.register
+ kafka.multi_receive([event])
+ end
+
+ it 'should only sleep retries number of times' do
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+ .at_most(max_sends)
+ .and_wrap_original do |m, *args|
+ # Always fail.
+ future = java.util.concurrent.FutureTask.new { raise "Failed" }
+ future.run
+ future
+ end
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
+ expect(kafka).to receive(:sleep).exactly(retries).times
kafka.register
kafka.multi_receive([event])
end
end
end