lib/kafka/connection.rb in ruby-kafka-0.1.1 vs lib/kafka/connection.rb in ruby-kafka-0.1.2

- old
+ new

@@ -1,7 +1,8 @@ require "stringio" require "kafka/socket_with_timeout" +require "kafka/instrumentation" require "kafka/protocol/request_message" require "kafka/protocol/encoder" require "kafka/protocol/decoder" module Kafka @@ -9,12 +10,24 @@ # A connection to a single Kafka broker. # # Usually you'll need a separate connection to each broker in a cluster, since most # requests must be directed specifically to the broker that is currently leader for # the set of topic partitions you want to produce to or consumer from. + # + # ## Instrumentation + # + # Connections emit a `request.kafka` notification on each request. The following + # keys will be found in the payload: + # + # * `:api` — the name of the API being invoked. + # * `:request_size` — the number of bytes in the request. + # * `:response_size` — the number of bytes in the response. + # + # The notification also includes the duration of the request. + # class Connection - SOCKET_TIMEOUT = 5 + SOCKET_TIMEOUT = 10 CONNECT_TIMEOUT = 10 # Opens a connection to a Kafka broker. # # @param host [String] the hostname of the broker. @@ -24,46 +37,26 @@ # making the request. # @param logger [Logger] the logger used to log trace messages. # @param connect_timeout [Integer] the socket timeout for connecting to the broker. # Default is 10 seconds. # @param socket_timeout [Integer] the socket timeout for reading and writing to the - # broker. Default is 5 seconds. + # broker. Default is 10 seconds. # # @return [Connection] a new connection. def initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) @host, @port, @client_id = host, port, client_id @logger = logger @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT - - @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." - - connect end def to_s "#{@host}:#{@port}" end - def connect - @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) - - @encoder = Kafka::Protocol::Encoder.new(@socket) - @decoder = Kafka::Protocol::Decoder.new(@socket) - - # Correlation id is initialized to zero and bumped for each request. - @correlation_id = 0 - rescue Errno::ETIMEDOUT => e - @logger.error "Timed out while trying to connect to #{self}: #{e}" - raise ConnectionError, e - rescue SocketError, Errno::ECONNREFUSED => e - @logger.error "Failed to connect to #{self}: #{e}" - raise ConnectionError, e - end - - def connected? + def open? !@socket.nil? end def close @logger.debug "Closing socket to #{to_s}" @@ -73,36 +66,28 @@ @socket = nil end # Sends a request over the connection. # - # @param api_key [Integer] the integer code for the API that is invoked. # @param request [#encode] the request that should be encoded and written. # @param response_class [#decode] an object that can decode the response. # # @return [Object] the response that was decoded by `response_class`. - def request(api_key, request, response_class) - connect unless connected? + def send_request(request, response_class) + Instrumentation.instrument("request.kafka") do |notification| + open unless open? - write_request(api_key, request) + @correlation_id += 1 - unless response_class.nil? - loop do - correlation_id, response = read_response(response_class) + # Look up the API name. + notification[:api] = Protocol.api_name(request.api_key) - # There may have been a previous request that timed out before the client - # was able to read the response. In that case, the response will still be - # sitting in the socket waiting to be read. If the response we just read - # was to a previous request, we can safely skip it. - if correlation_id < @correlation_id - @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" - elsif correlation_id > @correlation_id - raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" - else - break response - end - end + # We may not read a response, in which case the size is zero. + notification[:response_size] = 0 + + write_request(request, notification) + wait_for_response(response_class, notification) unless response_class.nil? end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e @logger.error "Connection error: #{e}" close @@ -110,28 +95,45 @@ raise ConnectionError, "Connection error: #{e}" end private + def open + @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." + + @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) + + @encoder = Kafka::Protocol::Encoder.new(@socket) + @decoder = Kafka::Protocol::Decoder.new(@socket) + + # Correlation id is initialized to zero and bumped for each request. + @correlation_id = 0 + rescue Errno::ETIMEDOUT => e + @logger.error "Timed out while trying to connect to #{self}: #{e}" + raise ConnectionError, e + rescue SocketError, Errno::ECONNREFUSED => e + @logger.error "Failed to connect to #{self}: #{e}" + raise ConnectionError, e + end + # Writes a request over the connection. # - # @param api_key [Integer] the integer code for the API that is invoked. # @param request [#encode] the request that should be encoded and written. # # @return [nil] - def write_request(api_key, request) - @correlation_id += 1 + def write_request(request, notification) @logger.debug "Sending request #{@correlation_id} to #{to_s}" message = Kafka::Protocol::RequestMessage.new( - api_key: api_key, + api_key: request.api_key, correlation_id: @correlation_id, client_id: @client_id, request: request, ) data = Kafka::Protocol::Encoder.encode_with(message) + notification[:request_size] = data.bytesize @encoder.write_bytes(data) nil rescue Errno::ETIMEDOUT @@ -143,16 +145,17 @@ # # @param response_class [#decode] an object that can decode the response from # a given Decoder. # # @return [nil] - def read_response(response_class) + def read_response(response_class, notification) @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" - bytes = @decoder.bytes + data = @decoder.bytes + notification[:response_size] = data.bytesize - buffer = StringIO.new(bytes) + buffer = StringIO.new(data) response_decoder = Kafka::Protocol::Decoder.new(buffer) correlation_id = response_decoder.int32 response = response_class.decode(response_decoder) @@ -160,8 +163,26 @@ return correlation_id, response rescue Errno::ETIMEDOUT @logger.error "Timed out while waiting for response #{@correlation_id}" raise + end + + def wait_for_response(response_class, notification) + loop do + correlation_id, response = read_response(response_class, notification) + + # There may have been a previous request that timed out before the client + # was able to read the response. In that case, the response will still be + # sitting in the socket waiting to be read. If the response we just read + # was to a previous request, we can safely skip it. + if correlation_id < @correlation_id + @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" + elsif correlation_id > @correlation_id + raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" + else + return response + end + end end end end