lib/kafka/connection.rb in ruby-kafka-0.3.18.beta1 vs lib/kafka/connection.rb in ruby-kafka-0.3.18.beta2
- old
+ new
@@ -1,44 +1,14 @@
require "stringio"
require "kafka/socket_with_timeout"
require "kafka/ssl_socket_with_timeout"
require "kafka/protocol/request_message"
-require "kafka/protocol/null_response"
require "kafka/protocol/encoder"
require "kafka/protocol/decoder"
module Kafka
- # An asynchronous response object allows us to deliver a response at some
- # later point in time.
- #
- # When instantiating an AsyncResponse, you provide a response decoder and
- # a block that will force the caller to wait until a response is available.
- class AsyncResponse
- # Use a custom "nil" value so that nil can be an actual value.
- MISSING = Object.new
-
- def initialize(decoder, &block)
- @decoder = decoder
- @block = block
- @response = MISSING
- end
-
- # Block until a response is available.
- def call
- @block.call if @response == MISSING
- @response
- end
-
- # Deliver the response data.
- #
- # After calling this, `#call` will returned the decoded response.
- def deliver(data)
- @response = @decoder.decode(data)
- end
- end
-
# A connection to a single Kafka broker.
#
# Usually you'll need a separate connection to each broker in a cluster, since most
# requests must be directed specifically to the broker that is currently leader for
# the set of topic partitions you want to produce to or consume from.
@@ -106,65 +76,27 @@
# @param request [#encode, #response_class] the request that should be
# encoded and written.
#
# @return [Object] the response.
def send_request(request)
- # Immediately block on the asynchronous request.
- send_async_request(request).call
- end
-
- # Sends a request over the connection.
- #
- # @param request [#encode, #response_class] the request that should be
- # encoded and written.
- #
- # @return [AsyncResponse] the async response, allowing the caller to choose
- # when to block.
- def send_async_request(request)
# Default notification payload.
notification = {
broker_host: @host,
api: Protocol.api_name(request.api_key),
request_size: 0,
response_size: 0,
}
- @instrumenter.start("request.connection", notification)
+ @instrumenter.instrument("request.connection", notification) do
+ open unless open?
- open unless open?
+ @correlation_id += 1
- @correlation_id += 1
+ write_request(request, notification)
- write_request(request, notification)
-
- response_class = request.response_class
- correlation_id = @correlation_id
-
- if response_class.nil?
- async_response = AsyncResponse.new(Protocol::NullResponse) { nil }
-
- # Immediately deliver a nil value.
- async_response.deliver(nil)
-
- @instrumenter.finish("request.connection", notification)
-
- async_response
- else
- async_response = AsyncResponse.new(response_class) {
- # A caller is trying to read the response, so we have to wait for it
- # before we can return.
- wait_for_response(correlation_id, notification)
-
- # Once done, we can finish the instrumentation.
- @instrumenter.finish("request.connection", notification)
- }
-
- # Store the asynchronous response so that data can be delivered to it
- # at a later time.
- @pending_async_responses[correlation_id] = async_response
-
- async_response
+ response_class = request.response_class
+ wait_for_response(response_class, notification) unless response_class.nil?
end
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
close
raise ConnectionError, "Connection error: #{e}"
@@ -184,13 +116,10 @@
@encoder = Kafka::Protocol::Encoder.new(@socket)
@decoder = Kafka::Protocol::Decoder.new(@socket)
# Correlation id is initialized to zero and bumped for each request.
@correlation_id = 0
-
- # The pipeline of pending response futures must be reset.
- @pending_async_responses = {}
rescue Errno::ETIMEDOUT => e
@logger.error "Timed out while trying to connect to #{self}: #{e}"
raise ConnectionError, e
rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
@logger.error "Failed to connect to #{self}: #{e}"
@@ -228,61 +157,44 @@
#
# @param response_class [#decode] an object that can decode the response from
# a given Decoder.
#
# @return [nil]
- def read_response(expected_correlation_id, notification)
- @logger.debug "Waiting for response #{expected_correlation_id} from #{to_s}"
+ def read_response(response_class, notification)
+ @logger.debug "Waiting for response #{@correlation_id} from #{to_s}"
data = @decoder.bytes
notification[:response_size] = data.bytesize
buffer = StringIO.new(data)
response_decoder = Kafka::Protocol::Decoder.new(buffer)
correlation_id = response_decoder.int32
+ response = response_class.decode(response_decoder)
@logger.debug "Received response #{correlation_id} from #{to_s}"
- return correlation_id, response_decoder
+ return correlation_id, response
rescue Errno::ETIMEDOUT
- @logger.error "Timed out while waiting for response #{expected_correlation_id}"
+ @logger.error "Timed out while waiting for response #{@correlation_id}"
raise
- rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
- close
-
- raise ConnectionError, "Connection error: #{e}"
end
- def wait_for_response(expected_correlation_id, notification)
+ def wait_for_response(response_class, notification)
loop do
- correlation_id, data = read_response(expected_correlation_id, notification)
+ correlation_id, response = read_response(response_class, notification)
- if correlation_id < expected_correlation_id
- # There may have been a previous request that timed out before the client
- # was able to read the response. In that case, the response will still be
- # sitting in the socket waiting to be read. If the response we just read
- # was to a previous request, we deliver it to the pending async response
- # future.
- async_response = @pending_async_responses.delete(correlation_id)
- async_response.deliver(data) if async_response
- elsif correlation_id > expected_correlation_id
- raise Kafka::Error, "Correlation id mismatch: expected #{expected_correlation_id} but got #{correlation_id}"
+ # There may have been a previous request that timed out before the client
+ # was able to read the response. In that case, the response will still be
+ # sitting in the socket waiting to be read. If the response we just read
+ # was to a previous request, we can safely skip it.
+ if correlation_id < @correlation_id
+ @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}"
+ elsif correlation_id > @correlation_id
+ raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}"
else
- # If the request was asynchronous, deliver the response to the pending
- # async response future.
- async_response = @pending_async_responses.delete(correlation_id)
- async_response.deliver(data)
-
- return async_response.call
+ return response
end
end
- rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
- notification[:exception] = [e.class.name, e.message]
- notification[:exception_object] = e
-
- close
-
- raise ConnectionError, "Connection error: #{e}"
end
end
end