Sha256: c0d30e3efbbf0624f18a8940e40b1acad10685792606d78f5b39b37ba0ee28d1

Contents?: true

Size: 791 Bytes

Versions: 11

Compression:

Stored size: 791 Bytes

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
require "concurrent"

class MockConsumer
  def initialize
    @wake = Concurrent::AtomicBoolean.new(false)
  end

  def subscribe(topics)
  end
  
  def poll(ms)
    if @wake.value
      raise org.apache.kafka.common.errors.WakeupException.new
    else
      10.times.map do
        org.apache.kafka.clients.consumer.ConsumerRecord.new("test", 0, 0, "key", "value")
      end
    end
  end

  def close
  end

  def wakeup
    @wake.make_true
  end
end

describe LogStash::Inputs::Kafka do
  let(:config) { { 'topics' => ['test'], 'consumer_threads' => 4 } }
  subject { LogStash::Inputs::Kafka.new(config) }

  it "should register" do
    expect {subject.register}.to_not raise_error
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
logstash-input-kafka-4.1.1 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-4.1.0 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-5.0.1 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.3 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.2 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-4.0.0 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta7 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta6 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta5 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta4 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta3 spec/unit/inputs/kafka_spec.rb