lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta4 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta5
- old
+ new
@@ -1,7 +1,7 @@
-require "socket"
require "stringio"
+require "kafka/socket_with_timeout"
require "kafka/protocol/request_message"
require "kafka/protocol/encoder"
require "kafka/protocol/decoder"
module Kafka
@@ -36,11 +36,11 @@
@connect_timeout = connect_timeout || CONNECT_TIMEOUT
@socket_timeout = socket_timeout || SOCKET_TIMEOUT
@logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."
- @socket = Socket.tcp(host, port, connect_timeout: @connect_timeout)
+ @socket = SocketWithTimeout.new(@host, @port, timeout: @connect_timeout)
@encoder = Kafka::Protocol::Encoder.new(@socket)
@decoder = Kafka::Protocol::Decoder.new(@socket)
# Correlation id is initialized to zero and bumped for each request.
@@ -110,18 +110,16 @@
request: request,
)
data = Kafka::Protocol::Encoder.encode_with(message)
- unless IO.select(nil, [@socket], nil, @socket_timeout)
- @logger.error "Timed out while writing request #{@correlation_id}"
- raise ConnectionError
- end
-
@encoder.write_bytes(data)
nil
+ rescue Errno::ETIMEDOUT
+ @logger.error "Timed out while writing request #{@correlation_id}"
+ raise ConnectionError
end
# Reads a response from the connection.
#
# @param response_class [#decode] an object that can decode the response from
@@ -129,15 +127,10 @@
#
# @return [nil]
def read_response(response_class)
@logger.debug "Waiting for response #{@correlation_id} from #{to_s}"
- unless IO.select([@socket], nil, nil, @socket_timeout)
- @logger.error "Timed out while waiting for response #{@correlation_id}"
- raise ConnectionError
- end
-
bytes = @decoder.bytes
buffer = StringIO.new(bytes)
response_decoder = Kafka::Protocol::Decoder.new(buffer)
@@ -145,8 +138,11 @@
response = response_class.decode(response_decoder)
@logger.debug "Received response #{correlation_id} from #{to_s}"
return correlation_id, response
+ rescue Errno::ETIMEDOUT
+ @logger.error "Timed out while waiting for response #{@correlation_id}"
+ raise ConnectionError
end
end
end