spec/inputs/kafka_spec.rb in logstash-input-kafka-1.0.1 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-2.0.0

- old
+ new

@@ -5,12 +5,11 @@ class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka private def queue_event(msg, output_queue) super(msg, output_queue) - # need to raise exception here to stop the infinite loop - raise LogStash::ShutdownSignal + do_stop end end class TestMessageAndMetadata attr_reader :topic, :partition, :key, :message @@ -28,11 +27,30 @@ blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message') a_queue << blah end end -describe 'inputs/kafka' do +class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka + private + def queue_event(msg, output_queue) + super(msg, output_queue) + end +end + +class TestInfiniteKafkaGroup < Kafka::Group + def run(a_num_threads, a_queue) + blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message') + Thread.new do + while true + a_queue << blah + sleep 0.2 + end + end + end +end + +describe LogStash::Inputs::Kafka do let (:kafka_config) {{'topic_id' => 'test'}} let (:empty_config) {{}} let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}} let (:white_list_kafka_config) {{'white_list' => 'other_topic'}} let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}} @@ -55,10 +73,22 @@ it "should fail without topic configs" do input = LogStash::Plugin.lookup("input", "kafka").new(bad_kafka_config) expect {input.register}.to raise_error end + it_behaves_like "an interruptible input plugin" do + let(:config) { kafka_config } + let(:mock_kafka_plugin) { LogStash::Inputs::TestInfiniteKafka.new(config) } + + before :each do + allow(LogStash::Inputs::Kafka).to receive(:new).and_return(mock_kafka_plugin) + expect(subject).to receive(:create_consumer_group) do |options| + TestInfiniteKafkaGroup.new(options) + end + end + end + it 'should populate kafka config with default values' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) insist {kafka.zk_connect} == 'localhost:2181' insist {kafka.topic_id} == 'test' insist {kafka.group_id} == 'logstash' @@ -96,7 +126,6 @@ insist { e['kafka']['consumer_group'] } == 'logstash' insist { e['kafka']['msg_size'] } == 13 insist { e['kafka']['partition'] } == 0 insist { e['kafka']['key'] } == nil end - end