spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.6 vs spec/outputs/kafka_spec.rb in logstash-output-kafka-0.1.7
- old
+ new
@@ -1,15 +1,14 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require 'logstash/outputs/kafka'
-require 'logstash-output-kafka_jars'
require 'jruby-kafka'
require 'json'
describe "outputs/kafka" do
let (:simple_kafka_config) {{'topic_id' => 'test'}}
- let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic',
+ let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic', 'host' => '172.0.0.1',
'@timestamp' => LogStash::Timestamp.now}) }
context 'when initializing' do
it "should register" do
output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config)
@@ -39,9 +38,19 @@
it 'should support Event#sprintf placeholders in topic_id' do
topic_field = 'topic_name'
expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
.with(event[topic_field], nil, event.to_hash.to_json)
kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
+ kafka.register
+ kafka.receive(event)
+ end
+
+ it 'should support Event#sprintf placeholders in partition_key_format' do
+ partition_field = 'host'
+ expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
+ .with(simple_kafka_config['topic_id'], event[partition_field], event.to_hash.to_json)
+ kafka = LogStash::Outputs::Kafka.new({'topic_id' => simple_kafka_config['topic_id'],
+ 'partition_key_format' => "%{#{partition_field}}"})
kafka.register
kafka.receive(event)
end
end
end