lib/kafka/connection.rb in ruby-kafka-0.4.1 vs lib/kafka/connection.rb in ruby-kafka-0.4.2
- old
+ new
@@ -46,18 +46,19 @@
# Default is 10 seconds.
# @param socket_timeout [Integer] the socket timeout for reading and writing to the
# broker. Default is 10 seconds.
#
# @return [Connection] a new connection.
- def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
+ def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
@host, @port, @client_id = host, port, client_id
@logger = logger
@instrumenter = instrumenter
@connect_timeout = connect_timeout || CONNECT_TIMEOUT
@socket_timeout = socket_timeout || SOCKET_TIMEOUT
@ssl_context = ssl_context
+ @sasl_authenticator = sasl_authenticator
end
def to_s
"#{@host}:#{@port}"
end
@@ -126,9 +127,10 @@
# Correlation id is initialized to zero and bumped for each request.
@correlation_id = 0
@last_request = nil
+ @sasl_authenticator.authenticate!(self)
rescue Errno::ETIMEDOUT => e
@logger.error "Timed out while trying to connect to #{self}: #{e}"
raise ConnectionError, e
rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
@logger.error "Failed to connect to #{self}: #{e}"