Sha256: cd82f3c272e61ca3d7717e2cb5f7c9d28fed2ef75503e73e36743ff33a0fb96b
Contents?: true
Size: 1.33 KB
Versions: 6
Compression:
Stored size: 1.33 KB
Contents
module NulogyMessageBusConsumer module Steps class ConnectToMessageBus def initialize(config, logger, kafka_consumer: nil) @config = config @logger = logger @kafka_consumer = kafka_consumer end def call(**_) @logger.info("Connecting to the MessageBus") @logger.info("Using consumer group id: #{@config.consumer_group_id}") subscribe trap("TERM") { kafka_consumer.close } wait_for_assignment yield(kafka_consumer: kafka_consumer) end private def kafka_consumer @kafka_consumer ||= Rdkafka::Config.new(consumer_config).consumer end def consumer_config config = { "bootstrap.servers": @config.bootstrap_servers, "enable.auto.commit": false, "group.id": @config.consumer_group_id, "enable.auto.offset.store": false } config["client.id"] = @config.client_id if @config.client_id config end def subscribe kafka_consumer.subscribe(@config.topic_name) @logger.info("Listening for kafka messages on topic #{@config.topic_name}") end def wait_for_assignment KafkaUtils.wait_for_assignment(kafka_consumer) @logger.info("Connected as client: #{kafka_consumer.member_id}") end end end end
Version data entries
6 entries across 6 versions & 1 rubygems