Sha256: 5e400b1b8835f159b77ed1870c11d94fbd57c4dae82a5b5aa479a48447482421

Contents?: true

Size: 911 Bytes

Versions: 2

Compression:

Stored size: 911 Bytes

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
require "digest"

describe "input/kafka", :integration => true do
  let(:partition3_config) { { 'topics' => ['topic3'], 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} }
  
  let(:tries) { 60 }
  let(:num_events) { 103 }
  
  def wait_until_count(queue)
    num_tries = tries
    while (num_tries > 0)
      break if queue.size == num_events
      num_tries -= 1
      sleep 1
    end
  end
  
  def thread_it(kafka_input, queue)
    Thread.new do
      begin
        kafka_input.run(queue)
      end
    end
  end  
    
  it "should consume all messages from 3-partition topic" do
    kafka_input = LogStash::Inputs::Kafka.new(partition3_config)
    queue = Array.new
    t = thread_it(kafka_input, queue)
    t.run
    
    wait_until_count(queue)
    
    expect(queue.size).to eq(num_events)
  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
logstash-input-kafka-3.0.0.beta4 spec/integration/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta3 spec/integration/inputs/kafka_spec.rb