lib/riak/client/beefcake/protocol.rb in riak-client-2.3.2 vs lib/riak/client/beefcake/protocol.rb in riak-client-2.4.0.pre1

- old
+ new

@@ -6,14 +6,20 @@ module Riak class Client class BeefcakeProtobuffsBackend < ProtobuffsBackend class Protocol include Riak::Util::Translation - attr_reader :socket + attr_reader :socket, :read_timeout, :write_timeout - def initialize(socket) + # @param [Socket] + # @param [Hash] options + # @option options [Numeric] :read_timeout (nil) The read timeout, in seconds + # @option options [Numeric] :write_timeout (nil) The write timeout, in seconds + def initialize(socket, options = {}) @socket = socket + @read_timeout = options[:read_timeout] + @write_timeout = options[:write_timeout] end # Encodes and writes a Riak-formatted message, including protocol buffer # payload if given. # @@ -30,18 +36,39 @@ header = [serialized.length + 1, code].pack 'NC' payload = header + serialized - socket.write payload + if write_timeout + begin + loop do + bytes_written = socket.write_nonblock(payload) + # write_nonblock doesn't guarantee to write all data at once, + # so check if there are bytes left to be written + break if bytes_written >= payload.bytesize + payload.slice!(0, bytes_written) + end + rescue IO::WaitWritable, Errno::EINTR + # wait with the retry until socket is writable again + unless IO.select(nil, [socket], nil, write_timeout) + raise Errno::ETIMEDOUT, 'write timeout' + end + retry + end + else + socket.write(payload) + end socket.flush end # Receives a Riak-formatted message, and returns the symbolic name of # the message along with the string payload from the network. # # @return [Array<Symbol, String>] def receive + if read_timeout && !IO.select([socket], nil, nil, read_timeout) + raise Errno::ETIMEDOUT, 'read timeout' + end header = socket.read 5 raise ProtobuffsFailedHeader.new if header.nil? message_length, code = header.unpack 'NC' body_length = message_length - 1