Sha256: 5e400b1b8835f159b77ed1870c11d94fbd57c4dae82a5b5aa479a48447482421
Contents?: true
Size: 911 Bytes
Versions: 2
Compression:
Stored size: 911 Bytes
Contents
# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" require "digest" describe "input/kafka", :integration => true do let(:partition3_config) { { 'topics' => ['topic3'], 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } let(:tries) { 60 } let(:num_events) { 103 } def wait_until_count(queue) num_tries = tries while (num_tries > 0) break if queue.size == num_events num_tries -= 1 sleep 1 end end def thread_it(kafka_input, queue) Thread.new do begin kafka_input.run(queue) end end end it "should consume all messages from 3-partition topic" do kafka_input = LogStash::Inputs::Kafka.new(partition3_config) queue = Array.new t = thread_it(kafka_input, queue) t.run wait_until_count(queue) expect(queue.size).to eq(num_events) end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
logstash-input-kafka-3.0.0.beta4 | spec/integration/inputs/kafka_spec.rb |
logstash-input-kafka-3.0.0.beta3 | spec/integration/inputs/kafka_spec.rb |