require "stringio" require "kafka/socket_with_timeout" require "kafka/ssl_socket_with_timeout" require "kafka/protocol/request_message" require "kafka/protocol/null_response" require "kafka/protocol/encoder" require "kafka/protocol/decoder" module Kafka # An asynchronous response object allows us to deliver a response at some # later point in time. # # When instantiating an AsyncResponse, you provide a response decoder and # a block that will force the caller to wait until a response is available. class AsyncResponse # Use a custom "nil" value so that nil can be an actual value. MISSING = Object.new def initialize(decoder, &block) @decoder = decoder @block = block @response = MISSING end # Block until a response is available. def call @block.call if @response == MISSING @response end # Deliver the response data. # # After calling this, `#call` will returned the decoded response. def deliver(data) @response = @decoder.decode(data) end end # A connection to a single Kafka broker. # # Usually you'll need a separate connection to each broker in a cluster, since most # requests must be directed specifically to the broker that is currently leader for # the set of topic partitions you want to produce to or consume from. # # ## Instrumentation # # Connections emit a `request.connection.kafka` notification on each request. The following # keys will be found in the payload: # # * `:api` — the name of the API being invoked. # * `:request_size` — the number of bytes in the request. # * `:response_size` — the number of bytes in the response. # # The notification also includes the duration of the request. # class Connection SOCKET_TIMEOUT = 10 CONNECT_TIMEOUT = 10 attr_reader :encoder attr_reader :decoder # Opens a connection to a Kafka broker. # # @param host [String] the hostname of the broker. # @param port [Integer] the port of the broker. # @param client_id [String] the client id is a user-specified string sent in each # request to help trace calls and should logically identify the application # making the request. # @param logger [Logger] the logger used to log trace messages. # @param connect_timeout [Integer] the socket timeout for connecting to the broker. # Default is 10 seconds. # @param socket_timeout [Integer] the socket timeout for reading and writing to the # broker. Default is 10 seconds. # # @return [Connection] a new connection. def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context end def to_s "#{@host}:#{@port}" end def open? !@socket.nil? && !@socket.closed? end def close @logger.debug "Closing socket to #{to_s}" @socket.close if @socket @socket = nil end # Sends a request over the connection. # # @param request [#encode, #response_class] the request that should be # encoded and written. # # @return [Object] the response. def send_request(request) # Immediately block on the asynchronous request. send_async_request(request).call end # Sends a request over the connection. # # @param request [#encode, #response_class] the request that should be # encoded and written. # # @return [AsyncResponse] the async response, allowing the caller to choose # when to block. def send_async_request(request) # Default notification payload. notification = { broker_host: @host, api: Protocol.api_name(request.api_key), request_size: 0, response_size: 0, } @instrumenter.start("request.connection", notification) open unless open? @correlation_id += 1 write_request(request, notification) response_class = request.response_class correlation_id = @correlation_id if response_class.nil? async_response = AsyncResponse.new(Protocol::NullResponse) { nil } # Immediately deliver a nil value. async_response.deliver(nil) @instrumenter.finish("request.connection", notification) async_response else async_response = AsyncResponse.new(response_class) { # A caller is trying to read the response, so we have to wait for it # before we can return. wait_for_response(correlation_id, notification) # Once done, we can finish the instrumentation. @instrumenter.finish("request.connection", notification) } # Store the asynchronous response so that data can be delivered to it # at a later time. @pending_async_responses[correlation_id] = async_response async_response end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e close raise ConnectionError, "Connection error: #{e}" end private def open @logger.debug "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." if @ssl_context @socket = SSLSocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout, ssl_context: @ssl_context) else @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) end @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 # The pipeline of pending response futures must be reset. @pending_async_responses = {} rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e @logger.error "Failed to connect to #{self}: #{e}" raise ConnectionError, e end # Writes a request over the connection. # # @param request [#encode] the request that should be encoded and written. # # @return [nil] def write_request(request, notification) @logger.debug "Sending request #{@correlation_id} to #{to_s}" message = Kafka::Protocol::RequestMessage.new( api_key: request.api_key, api_version: request.respond_to?(:api_version) ? request.api_version : 0, correlation_id: @correlation_id, client_id: @client_id, request: request, ) data = Kafka::Protocol::Encoder.encode_with(message) notification[:request_size] = data.bytesize @encoder.write_bytes(data) nil rescue Errno::ETIMEDOUT @logger.error "Timed out while writing request #{@correlation_id}" raise end # Reads a response from the connection. # # @param response_class [#decode] an object that can decode the response from # a given Decoder. # # @return [nil] def read_response(expected_correlation_id, notification) @logger.debug "Waiting for response #{expected_correlation_id} from #{to_s}" data = @decoder.bytes notification[:response_size] = data.bytesize buffer = StringIO.new(data) response_decoder = Kafka::Protocol::Decoder.new(buffer) correlation_id = response_decoder.int32 @logger.debug "Received response #{correlation_id} from #{to_s}" return correlation_id, response_decoder rescue Errno::ETIMEDOUT @logger.error "Timed out while waiting for response #{expected_correlation_id}" raise rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e close raise ConnectionError, "Connection error: #{e}" end def wait_for_response(expected_correlation_id, notification) loop do correlation_id, data = read_response(expected_correlation_id, notification) if correlation_id < expected_correlation_id # There may have been a previous request that timed out before the client # was able to read the response. In that case, the response will still be # sitting in the socket waiting to be read. If the response we just read # was to a previous request, we deliver it to the pending async response # future. async_response = @pending_async_responses.delete(correlation_id) async_response.deliver(data) if async_response elsif correlation_id > expected_correlation_id raise Kafka::Error, "Correlation id mismatch: expected #{expected_correlation_id} but got #{correlation_id}" else # If the request was asynchronous, deliver the response to the pending # async response future. async_response = @pending_async_responses.delete(correlation_id) async_response.deliver(data) return async_response.call end end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e notification[:exception] = [e.class.name, e.message] notification[:exception_object] = e close raise ConnectionError, "Connection error: #{e}" end end end