Sha256: 2b0a4569c58344b22491d5e24e3ad01ea76df430ef525561da83ecb9d85921e0
Contents?: true
Size: 900 Bytes
Versions: 2
Compression:
Stored size: 900 Bytes
Contents
module NulogyMessageBusConsumer module Steps class ConnectToMessageBus def initialize(config, logger) @config = config @logger = logger end def call(**_) @logger.info("Connecting to the MessageBus") consumer = Rdkafka::Config.new(consumer_config).consumer @logger.info("Using consumer group id: #{@config.consumer_group_id}") consumer.subscribe(@config.topic_name) @logger.info("Listening for kafka messages on topic #{@config.topic_name}") trap("TERM") { consumer.close } KafkaUtils.wait_for_assignment(consumer) yield(kafka_consumer: consumer) end private def consumer_config { "bootstrap.servers": @config.bootstrap_servers, "enable.auto.commit": false, "group.id": @config.consumer_group_id, } end end end end
Version data entries
2 entries across 2 versions & 1 rubygems