Sha256: 9348ca45629b071dde5af798bd4dd7b44f26318b64f106a062084fd5bc9a55b9
Contents?: true
Size: 799 Bytes
Versions: 45
Compression:
Stored size: 799 Bytes
Contents
# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/kafka" require "concurrent" class MockConsumer def initialize @wake = Concurrent::AtomicBoolean.new(false) end def subscribe(topics) end def poll(ms) if @wake.value raise org.apache.kafka.common.errors.WakeupException.new else 10.times.map do org.apache.kafka.clients.consumer.ConsumerRecord.new("logstash", 0, 0, "key", "value") end end end def close end def wakeup @wake.make_true end end describe LogStash::Inputs::Kafka do let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4 } } subject { LogStash::Inputs::Kafka.new(config) } it "should register" do expect {subject.register}.to_not raise_error end end
Version data entries
45 entries across 45 versions & 2 rubygems