lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha2
- old
+ new
@@ -2,66 +2,111 @@
require "kafka/protocol/request_message"
require "kafka/protocol/encoder"
require "kafka/protocol/decoder"
module Kafka
- ConnectionError = Class.new(StandardError)
+ # A connection to a single Kafka broker.
+ #
+ # Usually you'll need a separate connection to each broker in a cluster, since most
+ # requests must be directed specifically to the broker that is currently leader for
+ # the set of topic partitions you want to produce to or consumer from.
class Connection
+ API_VERSION = 0
+
+ # Opens a connection to a Kafka broker.
+ #
+ # @param host [String] the hostname of the broker.
+ # @param port [Integer] the port of the broker.
+ # @param client_id [String] the client id is a user-specified string sent in each
+ # request to help trace calls and should logically identify the application
+ # making the request.
+ # @param logger [Logger] the logger used to log trace messages.
+ #
+ # @return [Connection] a new connection.
def initialize(host:, port:, client_id:, logger:)
- @host = host
- @port = port
- @client_id = client_id
+ @host, @port, @client_id = host, port, client_id
@logger = logger
- @socket = nil
- @correlation_id = 0
- @socket_timeout = 1000
- end
- def open
@logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."
- @socket = TCPSocket.new(@host, @port)
+ @socket = TCPSocket.new(host, port)
+
+ @encoder = Kafka::Protocol::Encoder.new(@socket)
+ @decoder = Kafka::Protocol::Decoder.new(@socket)
+
+ # Correlation id is initialized to zero and bumped for each request.
+ @correlation_id = 0
rescue SocketError => e
- @logger.error "Failed to connect to #{@host}:#{@port}: #{e}"
+ @logger.error "Failed to connect to #{host}:#{port}: #{e}"
raise ConnectionError, e
end
+ def to_s
+ "#{@host}:#{@port}"
+ end
+
+ # Sends a request over the connection.
+ #
+ # @param api_key [Integer] the integer code for the API that is invoked.
+ # @param request [#encode] the request that should be encoded and written.
+ # @param response_class [#decode] an object that can decode the response.
+ #
+ # @return [Object] the response that was decoded by `response_class`.
+ def request(api_key, request, response_class)
+ write_request(api_key, request)
+
+ unless response_class.nil?
+ read_response(response_class)
+ end
+ end
+
+ private
+
+ # Writes a request over the connection.
+ #
+ # @param api_key [Integer] the integer code for the API that is invoked.
+ # @param request [#encode] the request that should be encoded and written.
+ #
+ # @return [nil]
def write_request(api_key, request)
@correlation_id += 1
+ @logger.debug "Sending request #{@correlation_id} to #{to_s}"
message = Kafka::Protocol::RequestMessage.new(
api_key: api_key,
- api_version: 0,
+ api_version: API_VERSION,
correlation_id: @correlation_id,
client_id: @client_id,
request: request,
)
- buffer = StringIO.new
- message_encoder = Kafka::Protocol::Encoder.new(buffer)
- message.encode(message_encoder)
+ data = Kafka::Protocol::Encoder.encode_with(message)
+ @encoder.write_bytes(data)
- @logger.info "Sending request #{@correlation_id} (#{request.class})..."
-
- connection_encoder = Kafka::Protocol::Encoder.new(@socket)
- connection_encoder.write_bytes(buffer.string)
+ nil
end
- def read_response(response)
- @logger.info "Reading response #{response.class}"
+ # Reads a response from the connection.
+ #
+ # @param response_class [#decode] an object that can decode the response from
+ # a given Decoder.
+ #
+ # @return [nil]
+ def read_response(response_class)
+ @logger.debug "Waiting for response #{@correlation_id} from #{to_s}"
- connection_decoder = Kafka::Protocol::Decoder.new(@socket)
- bytes = connection_decoder.bytes
+ bytes = @decoder.bytes
buffer = StringIO.new(bytes)
response_decoder = Kafka::Protocol::Decoder.new(buffer)
correlation_id = response_decoder.int32
+ response = response_class.decode(response_decoder)
- @logger.info "Correlation id #{correlation_id}"
+ @logger.debug "Received response #{correlation_id} from #{to_s}"
- response.decode(response_decoder)
+ response
end
end
end