spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-2.0.5 vs spec/unit/outputs/kafka_spec.rb in logstash-output-kafka-3.0.0.beta1
- old
+ new
@@ -1,9 +1,8 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require 'logstash/outputs/kafka'
-require 'jruby-kafka'
require 'json'
describe "outputs/kafka" do
let (:simple_kafka_config) {{'topic_id' => 'test'}}
let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1',
@@ -23,30 +22,37 @@
end
end
context 'when outputting messages' do
it 'should send logstash event to kafka broker' do
- expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg)
- .with(simple_kafka_config['topic_id'], nil, nil, event.to_hash.to_json)
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
+ .with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord))
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
kafka.register
kafka.receive(event)
end
it 'should support Event#sprintf placeholders in topic_id' do
topic_field = 'topic_name'
- expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg)
- .with(event[topic_field], nil, nil, event.to_hash.to_json)
+ expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
+ .with("my_topic", event.to_s)
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
kafka.register
kafka.receive(event)
end
it 'should support field referenced message_keys' do
- expect_any_instance_of(Kafka::KafkaProducer).to receive(:send_msg)
- .with(simple_kafka_config['topic_id'], nil, event['host'], event.to_hash.to_json)
+ expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
+ .with("test", "172.0.0.1", event.to_s)
+ expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
kafka.register
kafka.receive(event)
+ end
+
+ it 'should raise config error when truststore location is not set and ssl is enabled' do
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"}))
+ expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
end
end
end