lib/ruby_skynet/connection.rb in ruby_skynet-0.2.0 vs lib/ruby_skynet/connection.rb in ruby_skynet-0.3.0
- old
+ new
@@ -1,8 +1,9 @@
require 'bson'
require 'gene_pool'
require 'thread_safe'
+require 'resilient_socket'
#
# RubySkynet Connection
#
# Handles connecting to Skynet Servers as a host:port pair
@@ -78,11 +79,11 @@
# close immediately and the client should look elsewhere for this service.
#
# ClientID string
# ClientID is a UUID that is used by the client to identify itself in RPC requests.
@logger.debug "Waiting for Service Handshake"
- service_handshake = self.class.read_bson_document(socket)
+ service_handshake = Common.read_bson_document(socket)
@logger.trace 'Service Handshake', service_handshake
# #TODO When a reconnect returns registered == false need to throw an exception
# so that this host connection is not used
registered = service_handshake['registered']
@@ -157,31 +158,31 @@
socket.write(BSON.serialize(request))
# Since Send does not affect state on the server we can also retry reads
if idempotent
@logger.debug "Reading header from server"
- header = self.class.read_bson_document(socket)
+ header = Common.read_bson_document(socket)
@logger.debug 'Response Header', header
# Read the BSON response document
@logger.debug "Reading response from server"
- response = self.class.read_bson_document(socket)
+ response = Common.read_bson_document(socket)
@logger.trace 'Response', response
end
end
# Perform the read outside the retry block since a successful write
# means that the servers state may have been changed
unless idempotent
# Read header first as a separate BSON document
@logger.debug "Reading header from server"
- header = self.class.read_bson_document(socket)
+ header = Common.read_bson_document(socket)
@logger.debug 'Response Header', header
# Read the BSON response document
@logger.debug "Reading response from server"
- response = self.class.read_bson_document(socket)
+ response = Common.read_bson_document(socket)
@logger.trace 'Response', response
end
# Ensure the sequence number in the response header matches the
# sequence number sent in the request
@@ -217,26 +218,9 @@
@socket.close if @socket
end
########################
protected
-
- # Returns a BSON document read from the socket.
- # Returns nil if the operation times out or if a network
- # connection failure occurs
- def self.read_bson_document(socket)
- bytebuf = BSON::ByteBuffer.new
- # Read 4 byte size of following BSON document
- bytes = socket.read(4)
-
- # Read BSON document
- sz = bytes.unpack("V")[0]
- raise "Invalid Data received from server:#{bytes.inspect}" unless sz
-
- bytebuf.append!(bytes)
- bytebuf.append!(socket.read(sz - 4))
- return BSON.deserialize(bytebuf)
- end
# Returns a new connection pool for the specified server
def self.new_connection_pool(server, params={})
# Connection pool configuration options
config = pool_config.dup