lib/riak/client/rpc.rb in riakpb-0.1.5 vs lib/riak/client/rpc.rb in riakpb-0.1.6
- old
+ new
@@ -12,11 +12,11 @@
include Riak::Util::MessageCode
include Riak::Util::Translation
include Riak::Util::Encode
include Riak::Util::Decode
- RECV_LIMIT=1638400
+ RECV_LIMIT=1073741824
attr_reader :req_message, :response, :resp_message_codes, :resp_message, :status
# Establishes a Client ID with the Riak node, for the life of the RPC connection.
# @param [Client] client the Riak::Client object in which this Rpc instance lives
@@ -28,24 +28,25 @@
@client_id = request(Util::MessageCode::GET_CLIENT_ID_REQUEST).client_id
@set_client_id = Riak::RpbSetClientIdReq.new(:client_id => @client_id)
# Request / Response Data
@resp_message_codes = -1
- @resp_message = ''
+ @resp_message = []
@req_message_code = -1
@req_message = ''
@response = ''
end
# Clears the request / response data, in preparation for a new request
def clear
@resp_message_codes = -1
- @resp_message = ''
+ @resp_message = []
@req_message_code = -1
@req_message = ''
@response = ''
@status = false
+ @buffer = ''
end
# Opens a TCPSocket connection with the riak host/node
# @yield [TCPSocket] hands off the socket connection
# @return [TCPSocket] data that was exchanged with the host/node
@@ -91,24 +92,28 @@
rescue NoMethodError
@req_message = assemble_request mc
end
socket.send(@req_message, 0)
- self.response = socket.recv(@limit)
+ self.parse_response socket.recv(@limit)
- end while(false == (@response.done rescue true))
+ end while(false == (@response[:done] rescue true))
end # with_socket
return(@response)
end # stream_request
# Handles the response from the Riak node
# @param [String] value The message returned from the Riak node over the TCP Socket
# @return [Protobuf::Message] @response the processed response (if any) from the Riak node
- def response=(value)
- @resp_message = value
+ def parse_response(value)
+ @resp_message << value
- response_chunk, @resp_message_codes = decode_message(value)
+ value = @buffer + value
+
+# return {:done => false} if message_remaining?(@resp_message)
+
+ response_chunk, @resp_message_codes, @buffer = decode_message(value)
@resp_message_codes.each do |resp_mc|
if resp_mc.equal?(ERROR_RESPONSE)
raise FailedRequest.new(MC_RESPONSE_FOR[@req_message_code], @resp_message_codes, response_chunk)
end