spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.6 vs spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.7

- old
+ new

@@ -1,15 +1,14 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require 'logstash/outputs/kafka' -require 'logstash-output-kafka_jars' require 'jruby-kafka' require 'json' describe "outputs/kafka" do let (:simple_kafka_config) {{'topic_id' => 'test'}} - let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', + let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1', '@timestamp' => LogStash::Timestamp.now}) } context 'when initializing' do it "should register" do output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config) @@ -39,9 +38,19 @@ it 'should support Event#sprintf placeholders in topic_id' do topic_field = 'topic_name' expect_any_instance_of(Kafka::Producer).to receive(:send_msg) .with(event[topic_field], nil, event.to_hash.to_json) kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"}) + kafka.register + kafka.receive(event) + end + + it 'should support Event#sprintf placeholders in partition_key_format' do + partition_field = 'host' + expect_any_instance_of(Kafka::Producer).to receive(:send_msg) + .with(simple_kafka_config['topic_id'], event[partition_field], event.to_hash.to_json) + kafka = LogStash::Outputs::Kafka.new({'topic_id' => simple_kafka_config['topic_id'], + 'partition_key_format' => "%{#{partition_field}}"}) kafka.register kafka.receive(event) end end end