lib/ruby_skynet/client.rb in ruby_skynet-0.1.2 vs lib/ruby_skynet/client.rb in ruby_skynet-0.2.0

- old
+ new

@@ -1,9 +1,6 @@ require 'bson' -require 'sync_attr' -require 'multi_json' - # # RubySkynet Client # # Supports # RPC calls to Skynet @@ -11,128 +8,40 @@ # module RubySkynet class Client include SyncAttr - # Default doozer configuration - # To replace this default, set the config as follows: - # RubySkynet::Client.doozer_config = { .... } - sync_attr_accessor :doozer_config do - { - :server => '127.0.0.1:8046', - :read_timeout => 5, - :connect_timeout => 3, - :connect_retry_interval => 0.1, - :connect_retry_count => 3 - } - end - - # Lazy initialize Doozer Client - sync_cattr_reader :doozer do - Doozer::Client.new - end - - # Create a client connection, call the supplied block and close the connection on - # completion of the block - # - # Example - # - # require 'ruby_skynet' - # SemanticLogger.default_level = :trace - # SemanticLogger.appenders << SemanticLogger::Appender::File(STDOUT) - # RubySkynet::Client.connect('TutorialService') do |tutorial_service| - # p tutorial_service.call(:value => 5) - # end - def self.connect(service_name, params={}) - begin - client = self.new(service_name, params) - yield(client) - ensure - client.close if client - end - end - # Returns a new RubySkynet Client for the named service # + # Calls to an instance of the Client are thread-safe and can be called + # concurrently from multiple threads at the same time + # # Parameters: # :service_name # Name of the service to look for and connect to on Skynet # - # :doozer_servers [Array of String] - # Array of URL's of doozer servers to connect to with port numbers - # ['server1:2000', 'server2:2000'] + # :version + # Optional version number of the service in Skynet + # Default: '*' being the latest version of the service # - # The second server will only be attempted once the first server - # cannot be connected to or has timed out on connect - # A read failure or timeout will not result in switching to the second - # server, only a connection failure or during an automatic reconnect + # :region + # Optional region for this service in Skynet + # Default: 'Development' # - # :read_timeout [Float] - # Time in seconds to timeout on read - # Can be overridden by supplying a timeout in the read call - # Default: 60 + # Example # - # :connect_timeout [Float] - # Time in seconds to timeout when trying to connect to the server - # Default: Half of the :read_timeout ( 30 seconds ) + # require 'ruby_skynet' + # SemanticLogger.default_level = :trace + # SemanticLogger.appenders << SemanticLogger::Appender::File(STDOUT) # - # :connect_retry_count [Fixnum] - # Number of times to retry connecting when a connection fails - # Default: 10 - # - # :connect_retry_interval [Float] - # Number of seconds between connection retry attempts after the first failed attempt - # Default: 0.5 - def initialize(service_name, params = {}) + # tutorial_service = RubySkynet::Client.new('TutorialService') + # p tutorial_service.call('Add', :value => 5) + def initialize(service_name, version='*', region='Development') @service_name = service_name - @logger = SemanticLogger::Logger.new("#{self.class.name}: #{service_name}") - - # User configurable options - params[:read_timeout] ||= 60 - params[:connect_timeout] ||= 30 - params[:connect_retry_interval] ||= 0.1 - params[:connect_retry_count] ||= 5 - - # If Server name and port of where Skynet Service is running - # is not supplied look for it in Doozer - unless params[:server] || params[:servers] - params[:server] = self.class.server_for(service_name) - end - - # Disable buffering the send since it is a RPC call - params[:buffered] = false - - @logger.trace "Socket Connection parameters", params - - # For each new connection perform the Skynet handshake - params[:on_connect] = Proc.new do |socket| - # Reset user_data on each connection - socket.user_data = 0 - - # Receive Service Handshake - # Registered bool - # Registered indicates the state of this service. If it is false, the connection will - # close immediately and the client should look elsewhere for this service. - # - # ClientID string - # ClientID is a UUID that is used by the client to identify itself in RPC requests. - @logger.debug "Waiting for Service Handshake" - service_handshake = self.class.read_bson_document(socket) - @logger.trace 'Service Handshake', service_handshake - - # #TODO When a reconnect returns registered == false we need to go back to doozer - @registered = service_handshake['registered'] - @client_id = service_handshake['clientid'] - - # Send blank ClientHandshake - client_handshake = { 'clientid' => @client_id } - @logger.debug "Sending Client Handshake" - @logger.trace 'Client Handshake', client_handshake - socket.write(BSON.serialize(client_handshake)) - end - - @socket = ResilientSocket::TCPClient.new(params) + @logger = SemanticLogger::Logger.new("#{self.class.name}: #{service_name}/#{version}/#{region}") + @version = version + @region = region end # Performs a synchronous call to the Skynet Service # # Parameters: @@ -143,151 +52,30 @@ # # Returns the Hash result returned from the Skynet Service # # Raises RubySkynet::ProtocolError # Raises RubySkynet::SkynetException - def call(method_name, parameters) + def call(method_name, parameters, connection_params={}) # Skynet requires BSON RPC Calls to have the following format: # https://github.com/bketelsen/skynet/blob/protocol/protocol.md request_id = BSON::ObjectId.new.to_s @logger.tagged request_id do @logger.benchmark_info "Called Skynet Service: #{@service_name}.#{method_name}" do - - # Resilient Send - retry_count = 0 - @socket.retry_on_connection_failure do |socket| - # user_data is maintained per session and a different session could - # be supplied with each retry - socket.user_data ||= 0 - header = { - 'servicemethod' => "#{@service_name}.Forward", - 'seq' => socket.user_data, - } - @logger.debug "Sending Header" - @logger.trace 'Header', header - socket.write(BSON.serialize(header)) - - @logger.trace 'Parameters:', parameters - - # The parameters are placed in the request object in BSON serialized - # form - request = { - 'clientid' => @client_id, - 'in' => BSON.serialize(parameters).to_s, - 'method' => method_name.to_s, - 'requestinfo' => { - 'requestid' => request_id, - # Increment retry count to indicate that the request may have been tried previously - # TODO: this should be incremented if request is retried, - 'retrycount' => retry_count, - # TODO: this should be forwarded along in case of services also - # being a client and calling additional services. If empty it will - # be stuffed with connecting address - 'originaddress' => '' - } - } - - @logger.debug "Sending Request" - @logger.trace 'Request', request - socket.write(BSON.serialize(request)) + retries = 0 + # If it cannot connect to a server, try a different server + begin + Connection.with_connection(Registry.server_for(@service_name, @version, @region), connection_params) do |connection| + connection.rpc_call(request_id, @service_name, method_name, parameters) + end + rescue ResilientSocket::ConnectionFailure => exc + if (retries < 3) && exc.cause.is_a?(Errno::ECONNREFUSED) + retries += 1 + retry + end + # TODO rescue ServiceUnavailable retry x times until the service becomes available end - - # Once send is successful it could have been processed, so we can no - # longer retry now otherwise we could create a duplicate - # retry_count += 1 - - # Read header first as a separate BSON document - @logger.debug "Reading header from server" - header = self.class.read_bson_document(@socket) - @logger.debug 'Header', header - - # Read the BSON response document - @logger.debug "Reading response from server" - response = self.class.read_bson_document(@socket) - @logger.trace 'Response', response - - # Ensure the sequence number in the response header matches the - # sequence number sent in the request - if seq_no = header['seq'] - raise ProtocolError.new("Incorrect Response received, expected seq=#{@socket.user_data}, received: #{header.inspect}") if seq_no != @socket.user_data - else - raise ProtocolError.new("Invalid Response header, missing 'seq': #{header.inspect}") - end - - # Increment Sequence number only on successful response - @socket.user_data += 1 - - # If an error is returned from Skynet raise a Skynet exception - if error = header['error'] - raise SkynetException.new(error) if error.to_s.length > 0 - end - - # If an error is returned from the service raise a Service exception - if error = response['error'] - raise ServiceException.new(error) if error.to_s.length > 0 - end - - # Return Value - # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized - result = BSON.deserialize(response['out']) - @logger.trace 'Return Value', result - result end end - end - - # Returns a BSON document read from the socket. - # Returns nil if the operation times out or if a network - # connection failure occurs - def self.read_bson_document(socket) - bytebuf = BSON::ByteBuffer.new - # Read 4 byte size of following BSON document - bytes = '' - socket.read(4, bytes) - - # Read BSON document - sz = bytes.unpack("V")[0] - raise "Invalid Data received from server:#{bytes.inspect}" unless sz - - bytebuf.append!(bytes) - bytes = '' - sz -= 4 - until bytes.size >= sz - buf = '' - socket.read(sz, buf) - bytes << buf - end - bytebuf.append!(bytes) - return BSON.deserialize(bytebuf) - end - - def close() - @socket.close - end - - ############################## - #protected - - # Returns [Array] of the hostname and port pair [String] that implements a particular service - # Performs a doozer lookup to find the servers - # - # service_name: - # version: Version of service to locate - # Default: Find latest version - def self.registered_implementers(service_name, version = '*', region = 'Development') - hosts = [] - doozer.walk("/services/#{service_name}/#{version}/#{region}/*/*").each do |node| - entry = MultiJson.load(node.value) - hosts << entry if entry['Registered'] - end - hosts - end - - # Randomly returns a server that implements the requested service - def self.server_for(service_name, version = '*', region = 'Development') - hosts = registered_implementers(service_name, version, region) - service = hosts[rand(hosts.size)]['Config']['ServiceAddr'] - "#{service['IPAddress']}:#{service['Port']}" end end end