spec/inputs/kafka_spec.rb in logstash-input-kafka-1.0.1 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-2.0.0
- old
+ new
@@ -5,12 +5,11 @@
class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
private
def queue_event(msg, output_queue)
super(msg, output_queue)
- # need to raise exception here to stop the infinite loop
- raise LogStash::ShutdownSignal
+ do_stop
end
end
class TestMessageAndMetadata
attr_reader :topic, :partition, :key, :message
@@ -28,11 +27,30 @@
blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
a_queue << blah
end
end
-describe 'inputs/kafka' do
+class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka
+ private
+ def queue_event(msg, output_queue)
+ super(msg, output_queue)
+ end
+end
+
+class TestInfiniteKafkaGroup < Kafka::Group
+ def run(a_num_threads, a_queue)
+ blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
+ Thread.new do
+ while true
+ a_queue << blah
+ sleep 0.2
+ end
+ end
+ end
+end
+
+describe LogStash::Inputs::Kafka do
let (:kafka_config) {{'topic_id' => 'test'}}
let (:empty_config) {{}}
let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}}
let (:white_list_kafka_config) {{'white_list' => 'other_topic'}}
let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}}
@@ -55,10 +73,22 @@
it "should fail without topic configs" do
input = LogStash::Plugin.lookup("input", "kafka").new(bad_kafka_config)
expect {input.register}.to raise_error
end
+ it_behaves_like "an interruptible input plugin" do
+ let(:config) { kafka_config }
+ let(:mock_kafka_plugin) { LogStash::Inputs::TestInfiniteKafka.new(config) }
+
+ before :each do
+ allow(LogStash::Inputs::Kafka).to receive(:new).and_return(mock_kafka_plugin)
+ expect(subject).to receive(:create_consumer_group) do |options|
+ TestInfiniteKafkaGroup.new(options)
+ end
+ end
+ end
+
it 'should populate kafka config with default values' do
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
insist {kafka.zk_connect} == 'localhost:2181'
insist {kafka.topic_id} == 'test'
insist {kafka.group_id} == 'logstash'
@@ -96,7 +126,6 @@
insist { e['kafka']['consumer_group'] } == 'logstash'
insist { e['kafka']['msg_size'] } == 13
insist { e['kafka']['partition'] } == 0
insist { e['kafka']['key'] } == nil
end
-
end