Sha256: 5a6186d41508cbebce6a7352b7cd6c72850651bf66ced2ce79457558aaddd78f

Contents?: true

Size: 1.89 KB

Versions: 4

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

module Sbmt
  module KafkaProducer
    module Config
      class Kafka < Dry::Struct
        transform_keys(&:to_sym)

        # srv1:port1,srv2:port2,...
        SERVERS_REGEXP = /^[a-z\d.\-:]+(,[a-z\d.\-:]+)*$/.freeze

        # https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
        KAFKA_CONFIG_KEYS_REMAP = {
          servers: :"bootstrap.servers",
          connect_timeout: :"socket.connection.setup.timeout.ms",
          message_timeout: :"message.timeout.ms",
          ack_timeout: :"request.timeout.ms",
          retry_backoff: :"retry.backoff.ms",
          max_retries: :"message.send.max.retries",
          required_acks: :"request.required.acks"
        }

        attribute :servers, Sbmt::KafkaProducer::Types::String.constrained(format: SERVERS_REGEXP)

        # defaults are rdkafka's
        # see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
        attribute :connect_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2000)
        attribute :ack_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000)
        attribute :retry_backoff, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000)
        attribute :message_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(55000)
        attribute :required_acks, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(-1)
        attribute :max_retries, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2)

        attribute :kafka_config, Sbmt::KafkaProducer::Types::ConfigAttrs.optional.default({}.freeze)

        def to_kafka_options
          cfg = KAFKA_CONFIG_KEYS_REMAP.each_with_object({}) do |(key, kafka_key), hash|
            hash[kafka_key] = self[key]
          end

          kafka_config.symbolize_keys.merge(cfg)
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
sbmt-kafka_producer-3.2.0 lib/sbmt/kafka_producer/config/kafka.rb
sbmt-kafka_producer-3.1.1 lib/sbmt/kafka_producer/config/kafka.rb
sbmt-kafka_producer-3.1.0 lib/sbmt/kafka_producer/config/kafka.rb
sbmt-kafka_producer-3.0.0 lib/sbmt/kafka_producer/config/kafka.rb