spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.3 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.5

- old
+ new

@@ -1,44 +1,41 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/kafka" -describe 'inputs/kafka' do +class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka + milestone 1 + private + def queue_event(msg, output_queue) + super(msg, output_queue) + # need to raise exception here to stop the infinite loop + raise LogStash::ShutdownSignal + end +end + +describe 'inputs/kafka' do let (:kafka_config) {{'topic_id' => 'test'}} it "should register" do input = LogStash::Plugin.lookup("input", "kafka").new(kafka_config) expect {input.register}.to_not raise_error end it 'should populate kafka config with default values' do - kafka = LogStash::Inputs::Kafka.new(kafka_config) + kafka = LogStash::Inputs::TestKafka.new(kafka_config) insist {kafka.zk_connect} == 'localhost:2181' insist {kafka.topic_id} == 'test' insist {kafka.group_id} == 'logstash' !insist { kafka.reset_beginning } end it 'should retrieve event from kafka' do - # Extend class to control behavior - class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka - milestone 1 - private - def queue_event(msg, output_queue) - super(msg, output_queue) - # need to raise exception here to stop the infinite loop - raise LogStash::ShutdownSignal - end - end - kafka = LogStash::Inputs::TestKafka.new(kafka_config) kafka.register - class Kafka::Group - public - def run(a_num_threads, a_queue) - a_queue << 'Kafka message' - end + expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue| + a_queue << 'Kafka message' end logstash_queue = Queue.new kafka.run logstash_queue e = logstash_queue.pop