Sha256: a0d24297e35ee65715ad5227a9885b3c8a8d2be8bbe97b7f9fd301cd8f772282

Contents?: true

Size: 1.3 KB

Versions: 1

Compression:

Stored size: 1.3 KB

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"

class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
  milestone 1
  private
  def queue_event(msg, output_queue)
    super(msg, output_queue)
    # need to raise exception here to stop the infinite loop
    raise LogStash::ShutdownSignal
  end
end


describe 'inputs/kafka' do
  let (:kafka_config) {{'topic_id' => 'test'}}

  it "should register" do
    input = LogStash::Plugin.lookup("input", "kafka").new(kafka_config)
    expect {input.register}.to_not raise_error
  end

  it 'should populate kafka config with default values' do
    kafka = LogStash::Inputs::TestKafka.new(kafka_config)
    insist {kafka.zk_connect} == 'localhost:2181'
    insist {kafka.topic_id} == 'test'
    insist {kafka.group_id} == 'logstash'
    !insist { kafka.reset_beginning }
  end

  it 'should retrieve event from kafka' do
    kafka = LogStash::Inputs::TestKafka.new(kafka_config)
    kafka.register

    expect_any_instance_of(Kafka::Group).to receive(:run) do |a_num_threads, a_queue|
      a_queue << 'Kafka message'
    end

    logstash_queue = Queue.new
    kafka.run logstash_queue
    e = logstash_queue.pop
    insist { e['message'] } == 'Kafka message'
    # no metadata by default
    insist { e['kafka'] } == nil
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-input-kafka-0.1.5 spec/inputs/kafka_spec.rb