spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.9 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.10

- old
+ new

@@ -10,26 +10,55 @@ # need to raise exception here to stop the infinite loop raise LogStash::ShutdownSignal end end +class TestMessageAndMetadata + attr_reader :topic, :partition, :key, :message + def initialize(topic, partition, key, message) + @topic = topic + @partition = partition + @key = key + @message = message + end +end + class TestKafkaGroup < Kafka::Group def run(a_num_threads, a_queue) - a_queue << 'Kafka message' + blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message') + a_queue << blah end end describe 'inputs/kafka' do let (:kafka_config) {{'topic_id' => 'test'}} + let (:empty_config) {{}} + let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}} + let (:white_list_kafka_config) {{'white_list' => 'other_topic'}} let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}} it "should register" do input = LogStash::Plugin.lookup("input", "kafka").new(kafka_config) expect {input.register}.to_not raise_error end + it "should register with whitelist" do + input = LogStash::Plugin.lookup("input", "kafka").new(white_list_kafka_config) + expect {input.register}.to_not raise_error + end + + it "should fail with multiple topic configs" do + input = LogStash::Plugin.lookup("input", "kafka").new(empty_config) + expect {input.register}.to raise_error + end + + it "should fail without topic configs" do + input = LogStash::Plugin.lookup("input", "kafka").new(bad_kafka_config) + expect {input.register}.to raise_error + end + it 'should populate kafka config with default values' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) insist {kafka.zk_connect} == 'localhost:2181' insist {kafka.topic_id} == 'test' insist {kafka.group_id} == 'logstash' @@ -64,8 +93,10 @@ insist { e['message'] } == 'Kafka message' # no metadata by default insist { e['kafka']['topic'] } == 'test' insist { e['kafka']['consumer_group'] } == 'logstash' insist { e['kafka']['msg_size'] } == 13 + insist { e['kafka']['partition'] } == 0 + insist { e['kafka']['key'] } == nil end end