Sha256: ca124bec562106ff8ddf2433b53fe606bfaeefde2ee33ca0b57d818c64bea5b5
Contents?: true
Size: 1.77 KB
Versions: 1
Compression:
Stored size: 1.77 KB
Contents
require "open3" module Kafka module_function CONSUMER_POLL_TIMEOUT = 1000 def random_topic_name "test-topic-#{SecureRandom.uuid}" end def random_consumer_group "ruby-test-consumer-group-#{SecureRandom.uuid}" end def test_bootstrap_servers "#{ENV["MBC_KAFKA_HOST"]}:#{ENV["MBC_KAFKA_PORT"]}" end def setup_kafka_producer kafka_producer_config.producer end def kafka_producer_config config = {"bootstrap.servers": test_bootstrap_servers} Rdkafka::Config.new(config) end def setup_kafka_consumer(topic_name) consumer = kafka_consumer_config.consumer consumer.subscribe(topic_name) NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(consumer) consumer end def kafka_consumer_config config = { "auto.offset.reset": "beginning", "bootstrap.servers": test_bootstrap_servers, "enable.auto.commit": false, "group.id": random_consumer_group } Rdkafka::Config.new(config) end def create_topic(topic_name) run("kaf topic create #{topic_name} --brokers #{test_bootstrap_servers} --replicas 1 --partitions 3") end def delete_topic(topic_name) run("kaf topic delete #{topic_name} --brokers #{test_bootstrap_servers}") end def list_topics topics = run("kaf topics --brokers #{test_bootstrap_servers}") topics.split(" ") end def run(command) stdout, stderr, status = Open3.capture3(command) raise <<~OUTPUT if status != 0 Command `#{command}` failed with: STDOUT: #{stdout} STDERR: #{stderr} OUTPUT stdout end def test_config(topic_name) NulogyMessageBusConsumer::Config.new( consumer_group_id: random_consumer_group, bootstrap_servers: test_bootstrap_servers, topic_name: topic_name ) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-2.0.1 | spec/support/kafka.rb |