spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.3 vs spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.4

- old
+ new

@@ -1,36 +1,48 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require 'logstash/outputs/kafka' require 'logstash-output-kafka_jars' require 'jruby-kafka' +require 'json' describe "outputs/kafka" do - let (:kafka_config) {{'topic_id' => 'test'}} + let (:simple_kafka_config) {{'topic_id' => 'test'}} + let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', + '@timestamp' => LogStash::Timestamp.now}) } - it "should register" do - output = LogStash::Plugin.lookup("output", "kafka").new(kafka_config) - expect {output.register}.to_not raise_error - end + context 'when initializing' do + it "should register" do + output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config) + expect {output.register}.to_not raise_error + end - it 'should populate kafka config with default values' do - kafka = LogStash::Outputs::Kafka.new(kafka_config) - insist {kafka.broker_list} == 'localhost:9092' - insist {kafka.topic_id} == 'test' - insist {kafka.compression_codec} == 'none' - insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder' - insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner' - insist {kafka.producer_type} == 'sync' + it 'should populate kafka config with default values' do + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) + insist {kafka.broker_list} == 'localhost:9092' + insist {kafka.topic_id} == 'test' + insist {kafka.compression_codec} == 'none' + insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder' + insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner' + insist {kafka.producer_type} == 'sync' + end end - it 'should send logstash event to kafka broker' do - timestamp = LogStash::Timestamp.now - expect_any_instance_of(Kafka::Producer) - .to receive(:send_msg) - .with('test', nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}") - e = LogStash::Event.new({:message => 'hello world', :host => 'test', '@timestamp' => timestamp}) - kafka = LogStash::Outputs::Kafka.new(kafka_config) - kafka.register - kafka.receive(e) - end + context 'when outputting messages' do + it 'should send logstash event to kafka broker' do + expect_any_instance_of(Kafka::Producer).to receive(:send_msg) + .with(simple_kafka_config['topic_id'], nil, event.to_hash.to_json) + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config) + kafka.register + kafka.receive(event) + end + it 'should support Event#sprintf placeholders in topic_id' do + topic_field = 'topic_name' + expect_any_instance_of(Kafka::Producer).to receive(:send_msg) + .with(event[topic_field], nil, event.to_hash.to_json) + kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) + kafka.register + kafka.receive(event) + end + end end