Sha256: 10f37e34b4c423b68f88a3814c1d650431d0a158e2571145a5a2b3e7f5f79382

Contents?: true

Size: 1.4 KB

Versions: 2

Compression:

Stored size: 1.4 KB

Contents

require 'kafka/sasl_gssapi_authenticator'

module Kafka
  class ConnectionBuilder
    def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, sasl_gssapi_principal:, sasl_gssapi_keytab:)
      @client_id = client_id
      @logger = logger
      @instrumenter = instrumenter
      @connect_timeout = connect_timeout
      @socket_timeout = socket_timeout
      @ssl_context = ssl_context
      @sasl_gssapi_principal = sasl_gssapi_principal
      @sasl_gssapi_keytab = sasl_gssapi_keytab
    end

    def build_connection(host, port)
      connection = Connection.new(
        host: host,
        port: port,
        client_id: @client_id,
        connect_timeout: @connect_timeout,
        socket_timeout: @socket_timeout,
        logger: @logger,
        instrumenter: @instrumenter,
        ssl_context: @ssl_context
      )

      if authenticate_using_sasl_gssapi?
        sasl_gssapi_authenticate(connection)
      end

      connection
    end

    private

    def sasl_gssapi_authenticate(connection)
      auth = SaslGssapiAuthenticator.new(
        connection: connection,
        logger: @logger,
        sasl_gssapi_principal: @sasl_gssapi_principal,
        sasl_gssapi_keytab: @sasl_gssapi_keytab
      )

      auth.authenticate!
    end

    def authenticate_using_sasl_gssapi?
      !@ssl_context && @sasl_gssapi_principal && !@sasl_gssapi_principal.empty?
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
ruby-kafka-0.3.18.beta2 lib/kafka/connection_builder.rb
ruby-kafka-0.3.18.beta1 lib/kafka/connection_builder.rb