lib/kafka/connection.rb in ruby-kafka-0.3.18.beta1 vs lib/kafka/connection.rb in ruby-kafka-0.3.18.beta2

- old
+ new

@@ -1,44 +1,14 @@ require "stringio" require "kafka/socket_with_timeout" require "kafka/ssl_socket_with_timeout" require "kafka/protocol/request_message" -require "kafka/protocol/null_response" require "kafka/protocol/encoder" require "kafka/protocol/decoder" module Kafka - # An asynchronous response object allows us to deliver a response at some - # later point in time. - # - # When instantiating an AsyncResponse, you provide a response decoder and - # a block that will force the caller to wait until a response is available. - class AsyncResponse - # Use a custom "nil" value so that nil can be an actual value. - MISSING = Object.new - - def initialize(decoder, &block) - @decoder = decoder - @block = block - @response = MISSING - end - - # Block until a response is available. - def call - @block.call if @response == MISSING - @response - end - - # Deliver the response data. - # - # After calling this, `#call` will returned the decoded response. - def deliver(data) - @response = @decoder.decode(data) - end - end - # A connection to a single Kafka broker. # # Usually you'll need a separate connection to each broker in a cluster, since most # requests must be directed specifically to the broker that is currently leader for # the set of topic partitions you want to produce to or consume from. @@ -106,65 +76,27 @@ # @param request [#encode, #response_class] the request that should be # encoded and written. # # @return [Object] the response. def send_request(request) - # Immediately block on the asynchronous request. - send_async_request(request).call - end - - # Sends a request over the connection. - # - # @param request [#encode, #response_class] the request that should be - # encoded and written. - # - # @return [AsyncResponse] the async response, allowing the caller to choose - # when to block. - def send_async_request(request) # Default notification payload. notification = { broker_host: @host, api: Protocol.api_name(request.api_key), request_size: 0, response_size: 0, } - @instrumenter.start("request.connection", notification) + @instrumenter.instrument("request.connection", notification) do + open unless open? - open unless open? + @correlation_id += 1 - @correlation_id += 1 + write_request(request, notification) - write_request(request, notification) - - response_class = request.response_class - correlation_id = @correlation_id - - if response_class.nil? - async_response = AsyncResponse.new(Protocol::NullResponse) { nil } - - # Immediately deliver a nil value. - async_response.deliver(nil) - - @instrumenter.finish("request.connection", notification) - - async_response - else - async_response = AsyncResponse.new(response_class) { - # A caller is trying to read the response, so we have to wait for it - # before we can return. - wait_for_response(correlation_id, notification) - - # Once done, we can finish the instrumentation. - @instrumenter.finish("request.connection", notification) - } - - # Store the asynchronous response so that data can be delivered to it - # at a later time. - @pending_async_responses[correlation_id] = async_response - - async_response + response_class = request.response_class + wait_for_response(response_class, notification) unless response_class.nil? end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e close raise ConnectionError, "Connection error: #{e}" @@ -184,13 +116,10 @@ @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 - - # The pipeline of pending response futures must be reset. - @pending_async_responses = {} rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e @logger.error "Failed to connect to #{self}: #{e}" @@ -228,61 +157,44 @@ # # @param response_class [#decode] an object that can decode the response from # a given Decoder. # # @return [nil] - def read_response(expected_correlation_id, notification) - @logger.debug "Waiting for response #{expected_correlation_id} from #{to_s}" + def read_response(response_class, notification) + @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" data = @decoder.bytes notification[:response_size] = data.bytesize buffer = StringIO.new(data) response_decoder = Kafka::Protocol::Decoder.new(buffer) correlation_id = response_decoder.int32 + response = response_class.decode(response_decoder) @logger.debug "Received response #{correlation_id} from #{to_s}" - return correlation_id, response_decoder + return correlation_id, response rescue Errno::ETIMEDOUT - @logger.error "Timed out while waiting for response #{expected_correlation_id}" + @logger.error "Timed out while waiting for response #{@correlation_id}" raise - rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e - close - - raise ConnectionError, "Connection error: #{e}" end - def wait_for_response(expected_correlation_id, notification) + def wait_for_response(response_class, notification) loop do - correlation_id, data = read_response(expected_correlation_id, notification) + correlation_id, response = read_response(response_class, notification) - if correlation_id < expected_correlation_id - # There may have been a previous request that timed out before the client - # was able to read the response. In that case, the response will still be - # sitting in the socket waiting to be read. If the response we just read - # was to a previous request, we deliver it to the pending async response - # future. - async_response = @pending_async_responses.delete(correlation_id) - async_response.deliver(data) if async_response - elsif correlation_id > expected_correlation_id - raise Kafka::Error, "Correlation id mismatch: expected #{expected_correlation_id} but got #{correlation_id}" + # There may have been a previous request that timed out before the client + # was able to read the response. In that case, the response will still be + # sitting in the socket waiting to be read. If the response we just read + # was to a previous request, we can safely skip it. + if correlation_id < @correlation_id + @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" + elsif correlation_id > @correlation_id + raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" else - # If the request was asynchronous, deliver the response to the pending - # async response future. - async_response = @pending_async_responses.delete(correlation_id) - async_response.deliver(data) - - return async_response.call + return response end end - rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e - notification[:exception] = [e.class.name, e.message] - notification[:exception_object] = e - - close - - raise ConnectionError, "Connection error: #{e}" end end end