lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta1 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta2
- old
+ new
@@ -42,11 +42,11 @@
@encoder = Kafka::Protocol::Encoder.new(@socket)
@decoder = Kafka::Protocol::Decoder.new(@socket)
# Correlation id is initialized to zero and bumped for each request.
@correlation_id = 0
- rescue Errno::ETIMEDOUT
+ rescue Errno::ETIMEDOUT => e
@logger.error "Timed out while trying to connect to #{host}:#{port}: #{e}"
raise ConnectionError, e
rescue SocketError, Errno::ECONNREFUSED => e
@logger.error "Failed to connect to #{host}:#{port}: #{e}"
raise ConnectionError, e
@@ -65,16 +65,30 @@
#
# @param api_key [Integer] the integer code for the API that is invoked.
# @param request [#encode] the request that should be encoded and written.
# @param response_class [#decode] an object that can decode the response.
#
- # @return [Object] the response that was decoded by `response_class`.
+ # @return [Object] the response that was decoded by +response_class+.
def request(api_key, request, response_class)
write_request(api_key, request)
unless response_class.nil?
- read_response(response_class)
+ loop do
+ correlation_id, response = read_response(response_class)
+
+ # There may have been a previous request that timed out before the client
+ # was able to read the response. In that case, the response will still be
+ # sitting in the socket waiting to be read. If the response we just read
+ # was to a previous request, we can safely skip it.
+ if correlation_id < @correlation_id
+ @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}"
+ elsif correlation_id > @correlation_id
+ raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}"
+ else
+ break response
+ end
+ end
end
end
private
@@ -129,9 +143,9 @@
correlation_id = response_decoder.int32
response = response_class.decode(response_decoder)
@logger.debug "Received response #{correlation_id} from #{to_s}"
- response
+ return correlation_id, response
end
end
end