lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta1 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta2

- old
+ new

@@ -42,11 +42,11 @@ @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 - rescue Errno::ETIMEDOUT + rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{host}:#{port}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED => e @logger.error "Failed to connect to #{host}:#{port}: #{e}" raise ConnectionError, e @@ -65,16 +65,30 @@ # # @param api_key [Integer] the integer code for the API that is invoked. # @param request [#encode] the request that should be encoded and written. # @param response_class [#decode] an object that can decode the response. # - # @return [Object] the response that was decoded by `response_class`. + # @return [Object] the response that was decoded by +response_class+. def request(api_key, request, response_class) write_request(api_key, request) unless response_class.nil? - read_response(response_class) + loop do + correlation_id, response = read_response(response_class) + + # There may have been a previous request that timed out before the client + # was able to read the response. In that case, the response will still be + # sitting in the socket waiting to be read. If the response we just read + # was to a previous request, we can safely skip it. + if correlation_id < @correlation_id + @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" + elsif correlation_id > @correlation_id + raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" + else + break response + end + end end end private @@ -129,9 +143,9 @@ correlation_id = response_decoder.int32 response = response_class.decode(response_decoder) @logger.debug "Received response #{correlation_id} from #{to_s}" - response + return correlation_id, response end end end