Sha256: c0d30e3efbbf0624f18a8940e40b1acad10685792606d78f5b39b37ba0ee28d1
Contents?: true
Size: 791 Bytes
Versions: 11
Compression:
Stored size: 791 Bytes
Contents
# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" require "concurrent" class MockConsumer def initialize @wake = Concurrent::AtomicBoolean.new(false) end def subscribe(topics) end def poll(ms) if @wake.value raise org.apache.kafka.common.errors.WakeupException.new else 10.times.map do org.apache.kafka.clients.consumer.ConsumerRecord.new("test", 0, 0, "key", "value") end end end def close end def wakeup @wake.make_true end end describe LogStash::Inputs::Kafka do let(:config) { { 'topics' => ['test'], 'consumer_threads' => 4 } } subject { LogStash::Inputs::Kafka.new(config) } it "should register" do expect {subject.register}.to_not raise_error end end
Version data entries
11 entries across 11 versions & 1 rubygems