Sha256: 9348ca45629b071dde5af798bd4dd7b44f26318b64f106a062084fd5bc9a55b9

Contents?: true

Size: 799 Bytes

Versions: 45

Compression:

Stored size: 799 Bytes

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
require "concurrent"

class MockConsumer
  def initialize
    @wake = Concurrent::AtomicBoolean.new(false)
  end

  def subscribe(topics)
  end
  
  def poll(ms)
    if @wake.value
      raise org.apache.kafka.common.errors.WakeupException.new
    else
      10.times.map do
        org.apache.kafka.clients.consumer.ConsumerRecord.new("logstash", 0, 0, "key", "value")
      end
    end
  end

  def close
  end

  def wakeup
    @wake.make_true
  end
end

describe LogStash::Inputs::Kafka do
  let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4 } }
  subject { LogStash::Inputs::Kafka.new(config) }

  it "should register" do
    expect {subject.register}.to_not raise_error
  end
end

Version data entries

45 entries across 45 versions & 2 rubygems

Version Path
logstash-input-kafka-6.0.0 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-5.0.5 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-5.0.4 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-5.0.3 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-5.0.2 spec/unit/inputs/kafka_spec.rb