Sha256: ca124bec562106ff8ddf2433b53fe606bfaeefde2ee33ca0b57d818c64bea5b5

Contents?: true

Size: 1.77 KB

Versions: 1

Compression:

Stored size: 1.77 KB

Contents

require "open3"

module Kafka
  module_function

  CONSUMER_POLL_TIMEOUT = 1000

  def random_topic_name
    "test-topic-#{SecureRandom.uuid}"
  end

  def random_consumer_group
    "ruby-test-consumer-group-#{SecureRandom.uuid}"
  end

  def test_bootstrap_servers
    "#{ENV["MBC_KAFKA_HOST"]}:#{ENV["MBC_KAFKA_PORT"]}"
  end

  def setup_kafka_producer
    kafka_producer_config.producer
  end

  def kafka_producer_config
    config = {"bootstrap.servers": test_bootstrap_servers}

    Rdkafka::Config.new(config)
  end

  def setup_kafka_consumer(topic_name)
    consumer = kafka_consumer_config.consumer
    consumer.subscribe(topic_name)
    NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(consumer)
    consumer
  end

  def kafka_consumer_config
    config = {
      "auto.offset.reset": "beginning",
      "bootstrap.servers": test_bootstrap_servers,
      "enable.auto.commit": false,
      "group.id": random_consumer_group
    }

    Rdkafka::Config.new(config)
  end

  def create_topic(topic_name)
    run("kaf topic create #{topic_name} --brokers #{test_bootstrap_servers} --replicas 1 --partitions 3")
  end

  def delete_topic(topic_name)
    run("kaf topic delete #{topic_name} --brokers #{test_bootstrap_servers}")
  end

  def list_topics
    topics = run("kaf topics --brokers #{test_bootstrap_servers}")
    topics.split(" ")
  end

  def run(command)
    stdout, stderr, status = Open3.capture3(command)
    raise <<~OUTPUT if status != 0
      Command `#{command}` failed with:
      STDOUT:
      #{stdout}

      STDERR:
      #{stderr}
    OUTPUT

    stdout
  end

  def test_config(topic_name)
    NulogyMessageBusConsumer::Config.new(
      consumer_group_id: random_consumer_group,
      bootstrap_servers: test_bootstrap_servers,
      topic_name: topic_name
    )
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 spec/support/kafka.rb