Sha256: baccbadf1b1502cbb89fc5fa46539841ec65bc8512a1b4874d342ed0f0349499

Contents?: true

Size: 1.06 KB

Versions: 2

Compression:

Stored size: 1.06 KB

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/kafka"
require "concurrent"

class MockConsumer
  def initialize
    @wake = Concurrent::AtomicBoolean.new(false)
  end

  def subscribe(topics)
  end
  
  def poll(ms)
    if @wake.value
      raise org.apache.kafka.common.errors.WakeupException.new
    else
      10.times.map do
        org.apache.kafka.clients.consumer.ConsumerRecord.new("test", 0, 0, "key", "value")
      end
    end
  end

  def close
  end

  def wakeup
    @wake.make_true
  end
end

describe LogStash::Inputs::Kafka do
  let(:config) { { 'topics' => ['test'], 'num_threads' => 4 } }
  subject { LogStash::Inputs::Kafka.new(config) }

  it "should register" do
    expect {subject.register}.to_not raise_error
  end

  it "should run" do
    expect(subject).to receive(:new_consumer) do
      MockConsumer.new
    end.exactly(4).times

    subject.register
    q = Queue.new
    Thread.new do
      while q.size < 13
      end
      subject.do_stop
    end
    subject.run(q)

    expect(q.size).to eq(40)
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
logstash-input-kafka-3.0.0.beta2 spec/unit/inputs/kafka_spec.rb
logstash-input-kafka-3.0.0.beta1 spec/unit/inputs/kafka_spec.rb