lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha2 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta1

- old
+ new

@@ -9,45 +9,60 @@ # # Usually you'll need a separate connection to each broker in a cluster, since most # requests must be directed specifically to the broker that is currently leader for # the set of topic partitions you want to produce to or consumer from. class Connection - API_VERSION = 0 + SOCKET_TIMEOUT = 5 + CONNECT_TIMEOUT = 10 # Opens a connection to a Kafka broker. # # @param host [String] the hostname of the broker. # @param port [Integer] the port of the broker. # @param client_id [String] the client id is a user-specified string sent in each # request to help trace calls and should logically identify the application # making the request. # @param logger [Logger] the logger used to log trace messages. + # @param connect_timeout [Integer] the socket timeout for connecting to the broker. + # Default is 10 seconds. + # @param socket_timeout [Integer] the socket timeout for reading and writing to the + # broker. Default is 5 seconds. # # @return [Connection] a new connection. - def initialize(host:, port:, client_id:, logger:) + def initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) @host, @port, @client_id = host, port, client_id @logger = logger + @connect_timeout = connect_timeout || CONNECT_TIMEOUT + @socket_timeout = socket_timeout || SOCKET_TIMEOUT + @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." - @socket = TCPSocket.new(host, port) + @socket = Socket.tcp(host, port, connect_timeout: @connect_timeout) @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 - rescue SocketError => e + rescue Errno::ETIMEDOUT + @logger.error "Timed out while trying to connect to #{host}:#{port}: #{e}" + raise ConnectionError, e + rescue SocketError, Errno::ECONNREFUSED => e @logger.error "Failed to connect to #{host}:#{port}: #{e}" - raise ConnectionError, e end def to_s "#{@host}:#{@port}" end + def close + @logger.debug "Closing socket to #{to_s}" + @socket.close + end + # Sends a request over the connection. # # @param api_key [Integer] the integer code for the API that is invoked. # @param request [#encode] the request that should be encoded and written. # @param response_class [#decode] an object that can decode the response. @@ -73,17 +88,22 @@ @correlation_id += 1 @logger.debug "Sending request #{@correlation_id} to #{to_s}" message = Kafka::Protocol::RequestMessage.new( api_key: api_key, - api_version: API_VERSION, correlation_id: @correlation_id, client_id: @client_id, request: request, ) data = Kafka::Protocol::Encoder.encode_with(message) + + unless IO.select(nil, [@socket], nil, @socket_timeout) + @logger.error "Timed out while writing request #{@correlation_id}" + raise ConnectionError + end + @encoder.write_bytes(data) nil end @@ -93,9 +113,14 @@ # a given Decoder. # # @return [nil] def read_response(response_class) @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" + + unless IO.select([@socket], nil, nil, @socket_timeout) + @logger.error "Timed out while waiting for response #{@correlation_id}" + raise ConnectionError + end bytes = @decoder.bytes buffer = StringIO.new(bytes) response_decoder = Kafka::Protocol::Decoder.new(buffer)