Sha256: cd82f3c272e61ca3d7717e2cb5f7c9d28fed2ef75503e73e36743ff33a0fb96b

Contents?: true

Size: 1.33 KB

Versions: 6

Compression:

Stored size: 1.33 KB

Contents

module NulogyMessageBusConsumer
  module Steps
    class ConnectToMessageBus
      def initialize(config, logger, kafka_consumer: nil)
        @config = config
        @logger = logger
        @kafka_consumer = kafka_consumer
      end

      def call(**_)
        @logger.info("Connecting to the MessageBus")
        @logger.info("Using consumer group id: #{@config.consumer_group_id}")

        subscribe

        trap("TERM") { kafka_consumer.close }

        wait_for_assignment

        yield(kafka_consumer: kafka_consumer)
      end

      private

      def kafka_consumer
        @kafka_consumer ||= Rdkafka::Config.new(consumer_config).consumer
      end

      def consumer_config
        config = {
          "bootstrap.servers": @config.bootstrap_servers,
          "enable.auto.commit": false,
          "group.id": @config.consumer_group_id,
          "enable.auto.offset.store": false
        }

        config["client.id"] = @config.client_id if @config.client_id

        config
      end

      def subscribe
        kafka_consumer.subscribe(@config.topic_name)
        @logger.info("Listening for kafka messages on topic #{@config.topic_name}")
      end

      def wait_for_assignment
        KafkaUtils.wait_for_assignment(kafka_consumer)
        @logger.info("Connected as client: #{kafka_consumer.member_id}")
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-1.0.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb