lib/ruby_skynet/connection.rb in ruby_skynet-0.2.0 vs lib/ruby_skynet/connection.rb in ruby_skynet-0.3.0

- old
+ new

@@ -1,8 +1,9 @@ require 'bson' require 'gene_pool' require 'thread_safe' +require 'resilient_socket' # # RubySkynet Connection # # Handles connecting to Skynet Servers as a host:port pair @@ -78,11 +79,11 @@ # close immediately and the client should look elsewhere for this service. # # ClientID string # ClientID is a UUID that is used by the client to identify itself in RPC requests. @logger.debug "Waiting for Service Handshake" - service_handshake = self.class.read_bson_document(socket) + service_handshake = Common.read_bson_document(socket) @logger.trace 'Service Handshake', service_handshake # #TODO When a reconnect returns registered == false need to throw an exception # so that this host connection is not used registered = service_handshake['registered'] @@ -157,31 +158,31 @@ socket.write(BSON.serialize(request)) # Since Send does not affect state on the server we can also retry reads if idempotent @logger.debug "Reading header from server" - header = self.class.read_bson_document(socket) + header = Common.read_bson_document(socket) @logger.debug 'Response Header', header # Read the BSON response document @logger.debug "Reading response from server" - response = self.class.read_bson_document(socket) + response = Common.read_bson_document(socket) @logger.trace 'Response', response end end # Perform the read outside the retry block since a successful write # means that the servers state may have been changed unless idempotent # Read header first as a separate BSON document @logger.debug "Reading header from server" - header = self.class.read_bson_document(socket) + header = Common.read_bson_document(socket) @logger.debug 'Response Header', header # Read the BSON response document @logger.debug "Reading response from server" - response = self.class.read_bson_document(socket) + response = Common.read_bson_document(socket) @logger.trace 'Response', response end # Ensure the sequence number in the response header matches the # sequence number sent in the request @@ -217,26 +218,9 @@ @socket.close if @socket end ######################## protected - - # Returns a BSON document read from the socket. - # Returns nil if the operation times out or if a network - # connection failure occurs - def self.read_bson_document(socket) - bytebuf = BSON::ByteBuffer.new - # Read 4 byte size of following BSON document - bytes = socket.read(4) - - # Read BSON document - sz = bytes.unpack("V")[0] - raise "Invalid Data received from server:#{bytes.inspect}" unless sz - - bytebuf.append!(bytes) - bytebuf.append!(socket.read(sz - 4)) - return BSON.deserialize(bytebuf) - end # Returns a new connection pool for the specified server def self.new_connection_pool(server, params={}) # Connection pool configuration options config = pool_config.dup