Sha256: bf1563cedb26376e1ebf25a0d476b77660dc3ad24e5f062a812252e7c80a07f5

Contents?: true

Size: 1.22 KB

Versions: 1

Compression:

Stored size: 1.22 KB

Contents

require "spec_helper"
require "turbine/consumer/kafka"
require "turbine/kafka_helper"
require "benchmark"

RSpec.describe Turbine::Consumer::Kafka do
  MESSAGE_COUNT = 100_000

  let(:example_topic) { @example_topic }

  def with_consumer
    consumer = described_class.new(
      "my-consumer-group",
      ["localhost:9092"],
      ["localhost:2181"],
      example_topic
    )

    begin
      yield consumer
    ensure
      consumer.close
    end
  end

  before :all do
    timestamp = Time.now.strftime("%Y%m%d%H%M%S%L")

    @example_topic = "turbike-kafka-specs-#{timestamp}"
    Turbine::KafkaHelper.create_topic(@example_topic)
    Turbine::KafkaHelper.fill_topic(@example_topic, MESSAGE_COUNT)
  end

  after :all do
    Turbine::KafkaHelper.delete_topic(@example_topic)
  end

  it "fetches batches of messages" do
    count = 0
    with_consumer do |consumer|
      rt = Benchmark.realtime do
        while count < MESSAGE_COUNT
          messages = consumer.fetch
          count += messages.size
        end
      end

      rate = "#{format('%.2f', count / rt)} msg/sec)"
      STDERR.puts("*** Performance: #{count} messages in #{format('%.2f', rt)} seconds (#{rate})")
    end

    expect(count).to eq MESSAGE_COUNT
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
turbine-1.0.0.pre2 spec/turbine/consumer/kafka_spec.rb