lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.alpha2 vs lib/kafka/connection.rb in ruby-kafka-0.1.0.pre.beta1
- old
+ new
@@ -9,45 +9,60 @@
#
# Usually you'll need a separate connection to each broker in a cluster, since most
# requests must be directed specifically to the broker that is currently leader for
# the set of topic partitions you want to produce to or consumer from.
class Connection
- API_VERSION = 0
+ SOCKET_TIMEOUT = 5
+ CONNECT_TIMEOUT = 10
# Opens a connection to a Kafka broker.
#
# @param host [String] the hostname of the broker.
# @param port [Integer] the port of the broker.
# @param client_id [String] the client id is a user-specified string sent in each
# request to help trace calls and should logically identify the application
# making the request.
# @param logger [Logger] the logger used to log trace messages.
+ # @param connect_timeout [Integer] the socket timeout for connecting to the broker.
+ # Default is 10 seconds.
+ # @param socket_timeout [Integer] the socket timeout for reading and writing to the
+ # broker. Default is 5 seconds.
#
# @return [Connection] a new connection.
- def initialize(host:, port:, client_id:, logger:)
+ def initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil)
@host, @port, @client_id = host, port, client_id
@logger = logger
+ @connect_timeout = connect_timeout || CONNECT_TIMEOUT
+ @socket_timeout = socket_timeout || SOCKET_TIMEOUT
+
@logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."
- @socket = TCPSocket.new(host, port)
+ @socket = Socket.tcp(host, port, connect_timeout: @connect_timeout)
@encoder = Kafka::Protocol::Encoder.new(@socket)
@decoder = Kafka::Protocol::Decoder.new(@socket)
# Correlation id is initialized to zero and bumped for each request.
@correlation_id = 0
- rescue SocketError => e
+ rescue Errno::ETIMEDOUT
+ @logger.error "Timed out while trying to connect to #{host}:#{port}: #{e}"
+ raise ConnectionError, e
+ rescue SocketError, Errno::ECONNREFUSED => e
@logger.error "Failed to connect to #{host}:#{port}: #{e}"
-
raise ConnectionError, e
end
def to_s
"#{@host}:#{@port}"
end
+ def close
+ @logger.debug "Closing socket to #{to_s}"
+ @socket.close
+ end
+
# Sends a request over the connection.
#
# @param api_key [Integer] the integer code for the API that is invoked.
# @param request [#encode] the request that should be encoded and written.
# @param response_class [#decode] an object that can decode the response.
@@ -73,17 +88,22 @@
@correlation_id += 1
@logger.debug "Sending request #{@correlation_id} to #{to_s}"
message = Kafka::Protocol::RequestMessage.new(
api_key: api_key,
- api_version: API_VERSION,
correlation_id: @correlation_id,
client_id: @client_id,
request: request,
)
data = Kafka::Protocol::Encoder.encode_with(message)
+
+ unless IO.select(nil, [@socket], nil, @socket_timeout)
+ @logger.error "Timed out while writing request #{@correlation_id}"
+ raise ConnectionError
+ end
+
@encoder.write_bytes(data)
nil
end
@@ -93,9 +113,14 @@
# a given Decoder.
#
# @return [nil]
def read_response(response_class)
@logger.debug "Waiting for response #{@correlation_id} from #{to_s}"
+
+ unless IO.select([@socket], nil, nil, @socket_timeout)
+ @logger.error "Timed out while waiting for response #{@correlation_id}"
+ raise ConnectionError
+ end
bytes = @decoder.bytes
buffer = StringIO.new(bytes)
response_decoder = Kafka::Protocol::Decoder.new(buffer)