lib/riak/client/beefcake/protocol.rb in riak-client-2.3.2 vs lib/riak/client/beefcake/protocol.rb in riak-client-2.4.0.pre1
- old
+ new
@@ -6,14 +6,20 @@
module Riak
class Client
class BeefcakeProtobuffsBackend < ProtobuffsBackend
class Protocol
include Riak::Util::Translation
- attr_reader :socket
+ attr_reader :socket, :read_timeout, :write_timeout
- def initialize(socket)
+ # @param [Socket]
+ # @param [Hash] options
+ # @option options [Numeric] :read_timeout (nil) The read timeout, in seconds
+ # @option options [Numeric] :write_timeout (nil) The write timeout, in seconds
+ def initialize(socket, options = {})
@socket = socket
+ @read_timeout = options[:read_timeout]
+ @write_timeout = options[:write_timeout]
end
# Encodes and writes a Riak-formatted message, including protocol buffer
# payload if given.
#
@@ -30,18 +36,39 @@
header = [serialized.length + 1, code].pack 'NC'
payload = header + serialized
- socket.write payload
+ if write_timeout
+ begin
+ loop do
+ bytes_written = socket.write_nonblock(payload)
+ # write_nonblock doesn't guarantee to write all data at once,
+ # so check if there are bytes left to be written
+ break if bytes_written >= payload.bytesize
+ payload.slice!(0, bytes_written)
+ end
+ rescue IO::WaitWritable, Errno::EINTR
+ # wait with the retry until socket is writable again
+ unless IO.select(nil, [socket], nil, write_timeout)
+ raise Errno::ETIMEDOUT, 'write timeout'
+ end
+ retry
+ end
+ else
+ socket.write(payload)
+ end
socket.flush
end
# Receives a Riak-formatted message, and returns the symbolic name of
# the message along with the string payload from the network.
#
# @return [Array<Symbol, String>]
def receive
+ if read_timeout && !IO.select([socket], nil, nil, read_timeout)
+ raise Errno::ETIMEDOUT, 'read timeout'
+ end
header = socket.read 5
raise ProtobuffsFailedHeader.new if header.nil?
message_length, code = header.unpack 'NC'
body_length = message_length - 1