Sha256: 93f8850121205ba9297f090e818ec382fadba2758bc66d21f30abb645af00f69
Contents?: true
Size: 1.98 KB
Versions: 18
Compression:
Stored size: 1.98 KB
Contents
require "open3" require "rdkafka" module Kafka module_function def kafka_config config = { "auto.offset.reset": "beginning", "bootstrap.servers": test_bootstrap_servers, "enable.auto.commit": false, "group.id": random_consumer_group } Rdkafka::Config.new(config) end def random_topic_name "test-topic-#{SecureRandom.uuid}" end def random_consumer_group "ruby-test-consumer-group-#{SecureRandom.uuid}" end def test_bootstrap_servers "kafka:29093" end def setup_kafka_producer kafka_config.producer end def setup_kafka_consumer(topic_name) consumer = kafka_config.consumer puts "Subscribing to #{topic_name}" sleep 10 consumer.subscribe(topic_name) wait_for_assignment(consumer) consumer end def get_all_messages(consumer, timeout: 500) messages = [] loop do message = consumer.poll(timeout) return messages unless message messages << message end end def wait_for_messages(consumer) messages = [] SpecUtils.wait_for(attempts: 10) do messages = Kafka.get_all_messages(consumer) messages.present? end messages end def create_topic(topic_name) cmd = [ "kaf topic create #{topic_name}", "--brokers kafka:29093", "--replicas 1", "--partitions 3" ] run(cmd.join(" ")) end def delete_topic(topic_name) run("kaf topic delete #{topic_name} --brokers kafka:29093") end def list_topics topics = run("kaf topics --brokers kafka:29093") topics.split(" ") end def run(command) stdout, stderr, status = Open3.capture3(command) raise <<~OUTPUT if status != 0 Command `#{command}` failed with: STDOUT: #{stdout} STDERR: #{stderr} OUTPUT stdout end def wait_for_assignment(consumer) SpecUtils.wait_for { !consumer.assignment.empty? } end def wait_for_unassignment(consumer) SpecUtils.wait_for { consumer.assignment.empty? } end end
Version data entries
18 entries across 18 versions & 1 rubygems