lib/kafka/connection.rb in ruby-kafka-0.1.4 vs lib/kafka/connection.rb in ruby-kafka-0.1.5

- old
+ new

@@ -13,11 +13,11 @@ # requests must be directed specifically to the broker that is currently leader for # the set of topic partitions you want to produce to or consumer from. # # ## Instrumentation # - # Connections emit a `request.kafka` notification on each request. The following + # Connections emit a `request.connection.kafka` notification on each request. The following # keys will be found in the payload: # # * `:api` — the name of the API being invoked. # * `:request_size` — the number of bytes in the request. # * `:response_size` — the number of bytes in the response. @@ -66,26 +66,29 @@ @socket = nil end # Sends a request over the connection. # - # @param request [#encode] the request that should be encoded and written. - # @param response_class [#decode] an object that can decode the response. + # @param request [#encode, #response_class] the request that should be + # encoded and written. # - # @return [Object] the response that was decoded by `response_class`. - def send_request(request, response_class) - Instrumentation.instrument("request.kafka") do |notification| + # @return [Object] the response. + def send_request(request) + # Default notification payload. + notification = { + api: Protocol.api_name(request.api_key), + request_size: 0, + response_size: 0, + } + + Instrumentation.instrument("request.connection.kafka", notification) do open unless open? @correlation_id += 1 - # Look up the API name. - notification[:api] = Protocol.api_name(request.api_key) - - # We may not read a response, in which case the size is zero. - notification[:response_size] = 0 - write_request(request, notification) + + response_class = request.response_class wait_for_response(response_class, notification) unless response_class.nil? end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e close