spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.3 vs spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.4
- old
+ new
@@ -1,36 +1,48 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require 'logstash/outputs/kafka'
require 'logstash-output-kafka_jars'
require 'jruby-kafka'
+require 'json'
describe "outputs/kafka" do
- let (:kafka_config) {{'topic_id' => 'test'}}
+ let (:simple_kafka_config) {{'topic_id' => 'test'}}
+ let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic',
+ '@timestamp' => LogStash::Timestamp.now}) }
- it "should register" do
- output = LogStash::Plugin.lookup("output", "kafka").new(kafka_config)
- expect {output.register}.to_not raise_error
- end
+ context 'when initializing' do
+ it "should register" do
+ output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config)
+ expect {output.register}.to_not raise_error
+ end
- it 'should populate kafka config with default values' do
- kafka = LogStash::Outputs::Kafka.new(kafka_config)
- insist {kafka.broker_list} == 'localhost:9092'
- insist {kafka.topic_id} == 'test'
- insist {kafka.compression_codec} == 'none'
- insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder'
- insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner'
- insist {kafka.producer_type} == 'sync'
+ it 'should populate kafka config with default values' do
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
+ insist {kafka.broker_list} == 'localhost:9092'
+ insist {kafka.topic_id} == 'test'
+ insist {kafka.compression_codec} == 'none'
+ insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder'
+ insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner'
+ insist {kafka.producer_type} == 'sync'
+ end
end
- it 'should send logstash event to kafka broker' do
- timestamp = LogStash::Timestamp.now
- expect_any_instance_of(Kafka::Producer)
- .to receive(:send_msg)
- .with('test', nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}")
- e = LogStash::Event.new({:message => 'hello world', :host => 'test', '@timestamp' => timestamp})
- kafka = LogStash::Outputs::Kafka.new(kafka_config)
- kafka.register
- kafka.receive(e)
- end
+ context 'when outputting messages' do
+ it 'should send logstash event to kafka broker' do
+ expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
+ .with(simple_kafka_config['topic_id'], nil, event.to_hash.to_json)
+ kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
+ kafka.register
+ kafka.receive(event)
+ end
+ it 'should support Event#sprintf placeholders in topic_id' do
+ topic_field = 'topic_name'
+ expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
+ .with(event[topic_field], nil, event.to_hash.to_json)
+ kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
+ kafka.register
+ kafka.receive(event)
+ end
+ end
end