Sha256: 93f8850121205ba9297f090e818ec382fadba2758bc66d21f30abb645af00f69

Contents?: true

Size: 1.98 KB

Versions: 18

Compression:

Stored size: 1.98 KB

Contents

require "open3"
require "rdkafka"

module Kafka
  module_function

  def kafka_config
    config = {
      "auto.offset.reset": "beginning",
      "bootstrap.servers": test_bootstrap_servers,
      "enable.auto.commit": false,
      "group.id": random_consumer_group
    }

    Rdkafka::Config.new(config)
  end

  def random_topic_name
    "test-topic-#{SecureRandom.uuid}"
  end

  def random_consumer_group
    "ruby-test-consumer-group-#{SecureRandom.uuid}"
  end

  def test_bootstrap_servers
    "kafka:29093"
  end

  def setup_kafka_producer
    kafka_config.producer
  end

  def setup_kafka_consumer(topic_name)
    consumer = kafka_config.consumer
    puts "Subscribing to #{topic_name}"
    sleep 10
    consumer.subscribe(topic_name)
    wait_for_assignment(consumer)
    consumer
  end

  def get_all_messages(consumer, timeout: 500)
    messages = []

    loop do
      message = consumer.poll(timeout)

      return messages unless message

      messages << message
    end
  end

  def wait_for_messages(consumer)
    messages = []
    SpecUtils.wait_for(attempts: 10) do
      messages = Kafka.get_all_messages(consumer)
      messages.present?
    end
    messages
  end

  def create_topic(topic_name)
    cmd = [
      "kaf topic create #{topic_name}",
      "--brokers kafka:29093",
      "--replicas 1",
      "--partitions 3"
    ]
    run(cmd.join(" "))
  end

  def delete_topic(topic_name)
    run("kaf topic delete #{topic_name} --brokers kafka:29093")
  end

  def list_topics
    topics = run("kaf topics --brokers kafka:29093")
    topics.split(" ")
  end

  def run(command)
    stdout, stderr, status = Open3.capture3(command)
    raise <<~OUTPUT if status != 0
      Command `#{command}` failed with:
      STDOUT:
      #{stdout}

      STDERR:
      #{stderr}
    OUTPUT

    stdout
  end

  def wait_for_assignment(consumer)
    SpecUtils.wait_for { !consumer.assignment.empty? }
  end

  def wait_for_unassignment(consumer)
    SpecUtils.wait_for { consumer.assignment.empty? }
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
nulogy_message_bus_producer-5.0.8 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.7 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.6 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.5 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.4 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.3 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.2 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.1 spec/support/kafka.rb
nulogy_message_bus_producer-5.0.1.alpha spec/support/kafka.rb
nulogy_message_bus_producer-5.0.0 spec/support/kafka.rb
nulogy_message_bus_producer-4.0.0 spec/support/kafka.rb
nulogy_message_bus_producer-3.7.0 spec/support/kafka.rb
nulogy_message_bus_producer-3.6.0 spec/support/kafka.rb
nulogy_message_bus_producer-3.5.0 spec/support/kafka.rb
nulogy_message_bus_producer-4.0.0.alpha spec/support/kafka.rb
nulogy_message_bus_producer-3.4.1 spec/support/kafka.rb
nulogy_message_bus_producer-3.4.0 spec/support/kafka.rb
nulogy_message_bus_producer-3.3.0 spec/support/kafka.rb