spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-2.0.5 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-3.0.0.beta1

- old
+ new

@@ -1,9 +1,8 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require 'logstash/outputs/kafka' -require 'jruby-kafka' require 'json' describe "outputs/kafka" do let (:simple_kafka_config) {{'topic_id' => 'test'}} let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1', @@ -23,30 +22,37 @@ end end context 'when outputting messages' do it 'should send logstash event to kafka broker' do - expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg) - .with(simple_kafka_config['topic_id'], nil, nil, event.to_hash.to_json) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) kafka.register kafka.receive(event) end it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' - expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg) - .with(event[topic_field], nil, nil, event.to_hash.to_json) + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("my_topic", event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) kafka.register kafka.receive(event) end it 'should support field referenced message_keys' do - expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg) - .with(simple_kafka_config['topic_id'], nil, event['host'], event.to_hash.to_json) + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new) + .with("test", "172.0.0.1", event.to_s) + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"})) kafka.register kafka.receive(event) + end + + it 'should raise config error when truststore location is not set and ssl is enabled' do + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"})) + expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) end end end