lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta4 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta5

- old
+ new

@@ -1,7 +1,7 @@ -require "socket" require "stringio" +require "kafka/socket_with_timeout" require "kafka/protocol/request_message" require "kafka/protocol/encoder" require "kafka/protocol/decoder" module Kafka @@ -36,11 +36,11 @@ @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." - @socket = Socket.tcp(host, port, connect_timeout: @connect_timeout) + @socket = SocketWithTimeout.new(@host, @port, timeout: @connect_timeout) @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @@ -110,18 +110,16 @@ request: request, ) data = Kafka::Protocol::Encoder.encode_with(message) - unless IO.select(nil, [@socket], nil, @socket_timeout) - @logger.error "Timed out while writing request #{@correlation_id}" - raise ConnectionError - end - @encoder.write_bytes(data) nil + rescue Errno::ETIMEDOUT + @logger.error "Timed out while writing request #{@correlation_id}" + raise ConnectionError end # Reads a response from the connection. # # @param response_class [#decode] an object that can decode the response from @@ -129,15 +127,10 @@ # # @return [nil] def read_response(response_class) @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" - unless IO.select([@socket], nil, nil, @socket_timeout) - @logger.error "Timed out while waiting for response #{@correlation_id}" - raise ConnectionError - end - bytes = @decoder.bytes buffer = StringIO.new(bytes) response_decoder = Kafka::Protocol::Decoder.new(buffer) @@ -145,8 +138,11 @@ response = response_class.decode(response_decoder) @logger.debug "Received response #{correlation_id} from #{to_s}" return correlation_id, response + rescue Errno::ETIMEDOUT + @logger.error "Timed out while waiting for response #{@correlation_id}" + raise ConnectionError end end end