spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.9 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.10
- old
+ new
@@ -10,26 +10,55 @@
# need to raise exception here to stop the infinite loop
raise LogStash::ShutdownSignal
end
end
+class TestMessageAndMetadata
+ attr_reader :topic, :partition, :key, :message
+ def initialize(topic, partition, key, message)
+ @topic = topic
+ @partition = partition
+ @key = key
+ @message = message
+ end
+end
+
class TestKafkaGroup < Kafka::Group
def run(a_num_threads, a_queue)
- a_queue << 'Kafka message'
+ blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
+ a_queue << blah
end
end
describe 'inputs/kafka' do
let (:kafka_config) {{'topic_id' => 'test'}}
+ let (:empty_config) {{}}
+ let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}}
+ let (:white_list_kafka_config) {{'white_list' => 'other_topic'}}
let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}}
it "should register" do
input = LogStash::Plugin.lookup("input", "kafka").new(kafka_config)
expect {input.register}.to_not raise_error
end
+ it "should register with whitelist" do
+ input = LogStash::Plugin.lookup("input", "kafka").new(white_list_kafka_config)
+ expect {input.register}.to_not raise_error
+ end
+
+ it "should fail with multiple topic configs" do
+ input = LogStash::Plugin.lookup("input", "kafka").new(empty_config)
+ expect {input.register}.to raise_error
+ end
+
+ it "should fail without topic configs" do
+ input = LogStash::Plugin.lookup("input", "kafka").new(bad_kafka_config)
+ expect {input.register}.to raise_error
+ end
+
it 'should populate kafka config with default values' do
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
insist {kafka.zk_connect} == 'localhost:2181'
insist {kafka.topic_id} == 'test'
insist {kafka.group_id} == 'logstash'
@@ -64,8 +93,10 @@
insist { e['message'] } == 'Kafka message'
# no metadata by default
insist { e['kafka']['topic'] } == 'test'
insist { e['kafka']['consumer_group'] } == 'logstash'
insist { e['kafka']['msg_size'] } == 13
+ insist { e['kafka']['partition'] } == 0
+ insist { e['kafka']['key'] } == nil
end
end