lib/kafka/connection.rb in ruby-kafka-0.4.1 vs lib/kafka/connection.rb in ruby-kafka-0.4.2

- old
+ new

@@ -46,18 +46,19 @@ # Default is 10 seconds. # @param socket_timeout [Integer] the socket timeout for reading and writing to the # broker. Default is 10 seconds. # # @return [Connection] a new connection. - def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) + def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context + @sasl_authenticator = sasl_authenticator end def to_s "#{@host}:#{@port}" end @@ -126,9 +127,10 @@ # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 @last_request = nil + @sasl_authenticator.authenticate!(self) rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e @logger.error "Failed to connect to #{self}: #{e}"