Sha256: a0d24297e35ee65715ad5227a9885b3c8a8d2be8bbe97b7f9fd301cd8f772282
Contents?: true
Size: 1.3 KB
Versions: 1
Compression:
Stored size: 1.3 KB
Contents
# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka milestone 1 private def queue_event(msg, output_queue) super(msg, output_queue) # need to raise exception here to stop the infinite loop raise LogStash::ShutdownSignal end end describe 'inputs/kafka' do let (:kafka_config) {{'topic_id' => 'test'}} it "should register" do input = LogStash::Plugin.lookup("input", "kafka").new(kafka_config) expect {input.register}.to_not raise_error end it 'should populate kafka config with default values' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) insist {kafka.zk_connect} == 'localhost:2181' insist {kafka.topic_id} == 'test' insist {kafka.group_id} == 'logstash' !insist { kafka.reset_beginning } end it 'should retrieve event from kafka' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) kafka.register expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue| a_queue << 'Kafka message' end logstash_queue = Queue.new kafka.run logstash_queue e = logstash_queue.pop insist { e['message'] } == 'Kafka message' # no metadata by default insist { e['kafka'] } == nil end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
logstash-input-kafka-0.1.5 | spec/inputs/kafka_spec.rb |