Sha256: 2b0a4569c58344b22491d5e24e3ad01ea76df430ef525561da83ecb9d85921e0

Contents?: true

Size: 900 Bytes

Versions: 2

Compression:

Stored size: 900 Bytes

Contents

module NulogyMessageBusConsumer
  module Steps
    class ConnectToMessageBus
      def initialize(config, logger)
        @config = config
        @logger = logger
      end

      def call(**_)
        @logger.info("Connecting to the MessageBus")
        consumer = Rdkafka::Config.new(consumer_config).consumer
        @logger.info("Using consumer group id: #{@config.consumer_group_id}")

        consumer.subscribe(@config.topic_name)
        @logger.info("Listening for kafka messages on topic #{@config.topic_name}")

        trap("TERM") { consumer.close }

        KafkaUtils.wait_for_assignment(consumer)
        yield(kafka_consumer: consumer)
      end

      private

      def consumer_config
        {
          "bootstrap.servers": @config.bootstrap_servers,
          "enable.auto.commit": false,
          "group.id": @config.consumer_group_id,
        }
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-0.3.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-0.2.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb