lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha2

- old
+ new

@@ -2,66 +2,111 @@ require "kafka/protocol/request_message" require "kafka/protocol/encoder" require "kafka/protocol/decoder" module Kafka - ConnectionError = Class.new(StandardError) + # A connection to a single Kafka broker. + # + # Usually you'll need a separate connection to each broker in a cluster, since most + # requests must be directed specifically to the broker that is currently leader for + # the set of topic partitions you want to produce to or consumer from. class Connection + API_VERSION = 0 + + # Opens a connection to a Kafka broker. + # + # @param host [String] the hostname of the broker. + # @param port [Integer] the port of the broker. + # @param client_id [String] the client id is a user-specified string sent in each + # request to help trace calls and should logically identify the application + # making the request. + # @param logger [Logger] the logger used to log trace messages. + # + # @return [Connection] a new connection. def initialize(host:, port:, client_id:, logger:) - @host = host - @port = port - @client_id = client_id + @host, @port, @client_id = host, port, client_id @logger = logger - @socket = nil - @correlation_id = 0 - @socket_timeout = 1000 - end - def open @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." - @socket = TCPSocket.new(@host, @port) + @socket = TCPSocket.new(host, port) + + @encoder = Kafka::Protocol::Encoder.new(@socket) + @decoder = Kafka::Protocol::Decoder.new(@socket) + + # Correlation id is initialized to zero and bumped for each request. + @correlation_id = 0 rescue SocketError => e - @logger.error "Failed to connect to #{@host}:#{@port}: #{e}" + @logger.error "Failed to connect to #{host}:#{port}: #{e}" raise ConnectionError, e end + def to_s + "#{@host}:#{@port}" + end + + # Sends a request over the connection. + # + # @param api_key [Integer] the integer code for the API that is invoked. + # @param request [#encode] the request that should be encoded and written. + # @param response_class [#decode] an object that can decode the response. + # + # @return [Object] the response that was decoded by `response_class`. + def request(api_key, request, response_class) + write_request(api_key, request) + + unless response_class.nil? + read_response(response_class) + end + end + + private + + # Writes a request over the connection. + # + # @param api_key [Integer] the integer code for the API that is invoked. + # @param request [#encode] the request that should be encoded and written. + # + # @return [nil] def write_request(api_key, request) @correlation_id += 1 + @logger.debug "Sending request #{@correlation_id} to #{to_s}" message = Kafka::Protocol::RequestMessage.new( api_key: api_key, - api_version: 0, + api_version: API_VERSION, correlation_id: @correlation_id, client_id: @client_id, request: request, ) - buffer = StringIO.new - message_encoder = Kafka::Protocol::Encoder.new(buffer) - message.encode(message_encoder) + data = Kafka::Protocol::Encoder.encode_with(message) + @encoder.write_bytes(data) - @logger.info "Sending request #{@correlation_id} (#{request.class})..." - - connection_encoder = Kafka::Protocol::Encoder.new(@socket) - connection_encoder.write_bytes(buffer.string) + nil end - def read_response(response) - @logger.info "Reading response #{response.class}" + # Reads a response from the connection. + # + # @param response_class [#decode] an object that can decode the response from + # a given Decoder. + # + # @return [nil] + def read_response(response_class) + @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" - connection_decoder = Kafka::Protocol::Decoder.new(@socket) - bytes = connection_decoder.bytes + bytes = @decoder.bytes buffer = StringIO.new(bytes) response_decoder = Kafka::Protocol::Decoder.new(buffer) correlation_id = response_decoder.int32 + response = response_class.decode(response_decoder) - @logger.info "Correlation id #{correlation_id}" + @logger.debug "Received response #{correlation_id} from #{to_s}" - response.decode(response_decoder) + response end end end