spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.6 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.7

- old
+ new

@@ -1,20 +1,26 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" +require 'jruby-kafka' class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka - milestone 1 private def queue_event(msg, output_queue) super(msg, output_queue) # need to raise exception here to stop the infinite loop raise LogStash::ShutdownSignal end end +class TestKafkaGroup < Kafka::Group + def run(a_num_threads, a_queue) + a_queue << 'Kafka message' + end +end + describe 'inputs/kafka' do let (:kafka_config) {{'topic_id' => 'test'}} let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}} it "should register" do @@ -30,30 +36,28 @@ !insist { kafka.reset_beginning } end it 'should retrieve event from kafka' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) + expect(kafka).to receive(:create_consumer_group) do |options| + TestKafkaGroup.new(options) + end kafka.register - expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue| - a_queue << 'Kafka message' - end - logstash_queue = Queue.new kafka.run logstash_queue e = logstash_queue.pop insist { e['message'] } == 'Kafka message' # no metadata by default insist { e['kafka'] } == nil end it 'should retrieve a decorated event from kafka' do kafka = LogStash::Inputs::TestKafka.new(decorated_kafka_config) - kafka.register - - expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue| - a_queue << 'Kafka message' + expect(kafka).to receive(:create_consumer_group) do |options| + TestKafkaGroup.new(options) end + kafka.register logstash_queue = Queue.new kafka.run logstash_queue e = logstash_queue.pop insist { e['message'] } == 'Kafka message'