Sha256: dd0cda1dd75059a81e4757b886f60209f5ba765547987ff5a6f23e520cd06af8

Contents?: true

Size: 1.78 KB

Versions: 3

Compression:

Stored size: 1.78 KB

Contents

require 'kafka'

require_relative 'kafka/consumer'
require_relative 'kafka/producer'

module Messaging
  module Adapters
    # Internal: Adapter for producing and consuming messages with Kafka
    class Kafka
      # Internal: Ruby-Kafka client for the current thread.
      #
      # We keep one per thread as the client itself is not meant to be shared.
      # See https://github.com/zendesk/ruby-kafka#thread-safety for more information.
      def client
        Thread.current[:messaging_kafka_client] ||= create_kafka_client
      end

      # The producer doesn't need to be per thread as it
      # uses a background thread to deliver messages.
      def dispatcher
        @dispatcher ||= Producer.new(self)
      end

      def create_consumer(name, **options)
        Consumer.new(name: name, kafka_adapter: self, **options)
      end

      # Internal: Setup a Ruby-Kafka consumer
      def create_kafka_consumer(group_id)
        client.consumer({ group_id: group_id }.merge(Config.kafka.consumer.to_h))
      end

      private

      # Internal: Setup a Ruby-Kafka client
      def create_kafka_client
        # We have a specific logger for Ruby-Kafka as it
        # is a bit more noisy than we would like.
        kafka_logger = Messaging.logger.dup.tap { |logger| logger.level = Config.kafka.log_level }
       ::Kafka.new(Config.kafka.client.to_h.merge(client_id: Config.app_name, logger: kafka_logger))
      end

      def self.register!
        return if Adapters.key? :kafka

        Adapters.register(:kafka, memoize: true) { Kafka.new }
        Adapters::Consumer.register(:kafka, memoize: true) { Adapters[:kafka] }
        Adapters::Dispatcher.register(:kafka, memoize: true) { Adapters[:kafka].dispatcher }
      end
      private_class_method :register!

      register!
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
messaging-3.8.0 lib/messaging/adapters/kafka.rb
messaging-3.7.3 lib/messaging/adapters/kafka.rb
messaging-3.7.2 lib/messaging/adapters/kafka.rb