spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.6 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-0.1.7
- old
+ new
@@ -1,20 +1,26 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
+require 'jruby-kafka'
class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
- milestone 1
private
def queue_event(msg, output_queue)
super(msg, output_queue)
# need to raise exception here to stop the infinite loop
raise LogStash::ShutdownSignal
end
end
+class TestKafkaGroup < Kafka::Group
+ def run(a_num_threads, a_queue)
+ a_queue << 'Kafka message'
+ end
+end
+
describe 'inputs/kafka' do
let (:kafka_config) {{'topic_id' => 'test'}}
let (:decorated_kafka_config) {{'topic_id' => 'test', 'decorate_events' => true}}
it "should register" do
@@ -30,30 +36,28 @@
!insist { kafka.reset_beginning }
end
it 'should retrieve event from kafka' do
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
+ expect(kafka).to receive(:create_consumer_group) do |options|
+ TestKafkaGroup.new(options)
+ end
kafka.register
- expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue|
- a_queue << 'Kafka message'
- end
-
logstash_queue = Queue.new
kafka.run logstash_queue
e = logstash_queue.pop
insist { e['message'] } == 'Kafka message'
# no metadata by default
insist { e['kafka'] } == nil
end
it 'should retrieve a decorated event from kafka' do
kafka = LogStash::Inputs::TestKafka.new(decorated_kafka_config)
- kafka.register
-
- expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue|
- a_queue << 'Kafka message'
+ expect(kafka).to receive(:create_consumer_group) do |options|
+ TestKafkaGroup.new(options)
end
+ kafka.register
logstash_queue = Queue.new
kafka.run logstash_queue
e = logstash_queue.pop
insist { e['message'] } == 'Kafka message'