lib/kafka/connection.rb in ruby-kafka-0.1.4 vs lib/kafka/connection.rb in ruby-kafka-0.1.5
- old
+ new
@@ -13,11 +13,11 @@
# requests must be directed specifically to the broker that is currently leader for
# the set of topic partitions you want to produce to or consumer from.
#
# ## Instrumentation
#
- # Connections emit a `request.kafka` notification on each request. The following
+ # Connections emit a `request.connection.kafka` notification on each request. The following
# keys will be found in the payload:
#
# * `:api` — the name of the API being invoked.
# * `:request_size` — the number of bytes in the request.
# * `:response_size` — the number of bytes in the response.
@@ -66,26 +66,29 @@
@socket = nil
end
# Sends a request over the connection.
#
- # @param request [#encode] the request that should be encoded and written.
- # @param response_class [#decode] an object that can decode the response.
+ # @param request [#encode, #response_class] the request that should be
+ # encoded and written.
#
- # @return [Object] the response that was decoded by `response_class`.
- def send_request(request, response_class)
- Instrumentation.instrument("request.kafka") do |notification|
+ # @return [Object] the response.
+ def send_request(request)
+ # Default notification payload.
+ notification = {
+ api: Protocol.api_name(request.api_key),
+ request_size: 0,
+ response_size: 0,
+ }
+
+ Instrumentation.instrument("request.connection.kafka", notification) do
open unless open?
@correlation_id += 1
- # Look up the API name.
- notification[:api] = Protocol.api_name(request.api_key)
-
- # We may not read a response, in which case the size is zero.
- notification[:response_size] = 0
-
write_request(request, notification)
+
+ response_class = request.response_class
wait_for_response(response_class, notification) unless response_class.nil?
end
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
close