lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.1 vs lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.2

- old
+ new

@@ -1,37 +1,25 @@ require 'socket' +require 'thread' module MarilynRPC - class NativeClientProxy + class BlankSlate + instance_methods.each { |m| undef_method m unless m =~ /^__/ } + end + + class NativeClientProxy < BlankSlate # Creates a new Native client proxy, were the calls get send to the remote # side. # @param [Object] path the path that is used to identify the service - # @param [Socekt] socket the socket to use for communication - def initialize(path, socket) - @path, @socket = path, socket + # @param [NativeClient] client the client to use for communication + def initialize(path, client) + @path, @client = path, client end # Handler for calls to the remote system def method_missing(method, *args, &block) - # since this client can't multiplex, we set the tag to nil - @socket.write(MarilynRPC::MailFactory.build_call(nil, @path, method, args)) - - # read the answer of the server back in - answer = MarilynRPC::Envelope.new - # read the header to have the size - answer.parse!(@socket.read(4)) - # so now that we know the site, read the rest of the envelope - answer.parse!(@socket.read(answer.size)) - - # returns the result part of the mail or raise the exception if there is - # one - mail = MarilynRPC::MailFactory.unpack(answer) - if mail.is_a? MarilynRPC::CallResponseMail - mail.result - else - raise mail.exception - end + @client.execute(@path, method, args) end end # The client that will handle the socket to the remote. The native client is # written in pure ruby. @@ -45,10 +33,31 @@ class NativeClient # Create a native client for the socket. # @param [Socket] socket the socket to manage def initialize(socket) @socket = socket + @semaphore = Mutex.new + @threads = {} + @responses = {} + @thread = Thread.new do + loop do + # read the answer of the server back in + answer = MarilynRPC::Envelope.new + # read the header to have the size + answer.parse!(@socket.read(4)) + # so now that we know the site, read the rest of the envelope + answer.parse!(@socket.read(answer.size)) + + # returns the result part of the mail or raise the exception if there is + # one + mail = MarilynRPC::MailFactory.unpack(answer) + @semaphore.synchronize do + @responses[mail.tag] = mail # save the mail for the waiting thread + @threads.delete(mail.tag).wakeup # wake up the waiting thread + end + end + end end # Disconnect the client from the remote. def disconnect @socket.close @@ -58,11 +67,11 @@ # @param [Object] path the path were the service is registered on the remote # site # @return [MarilynRPC::NativeClientProxy] the proxy obejct that will serve # all calls def for(path) - NativeClientProxy.new(path, @socket) + NativeClientProxy.new(path, self) end # Connect to a unix domain socket. # @param [String] path the path to the socket file. # @return [MarilynRPC::NativeClient] the cónnected client @@ -71,11 +80,67 @@ end # Connect to a tcp socket. # @param [String] host the host to cennect to (e.g. 'localhost') # @param [Integer] port the port to connect to (e.g. 8000) - # @return [MarilynRPC::NativeClient] the cónnected client - def self.connect_tcp(host, port) - new(TCPSocket.open(host, port)) + # @param [Hash] options the + # @option options [Boolean] :secure use tls/ssl for the connection + # `true` or `false` + # @option options [OpenSSL::SSL::SSLContext] :ssl_context can be used to + # change the ssl context of the newly created secure connection. Only + # takes effect if `:secure` option is enabled. + # @return [MarilynRPC::NativeClient] the connected client + def self.connect_tcp(host, port, options = {}) + if options[:secure] == true + require 'openssl' # use openssl for secure connections + socket = TCPSocket.new(host, port) + if ssl_context = options[:ssl_context] + secure_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) + else + secure_socket = OpenSSL::SSL::SSLSocket.new(socket) + end + secure_socket.connect + new(secure_socket) + else + new(TCPSocket.open(host, port)) + end + end + + # Executes a client call blocking. To issue an async call one needs to + # have start separate threads. THe Native client uses then multiplexing to + # avoid the other threads blocking. + # @api private + # @param [Object] path the path to identifiy the service + # @param [Symbol, String] method the method name to call on the service + # @param [Array<Object>] args the arguments that are passed to the remote + # side + # @return [Object] the result of the call + def execute(path, method, args) + thread = Thread.current + tag = "#{Time.now.to_f}:#{thread.object_id}" + + @semaphore.synchronize do + # since this client can't multiplex, we set the tag to nil + @socket.write(MarilynRPC::MailFactory.build_call(tag, path, method, args)) + end + + # lets write our self to the list of waining threads + @semaphore.synchronize { @threads[tag] = thread } + + # enable the listening for a response from the remote end + @thread.wakeup + + # stop the current thread, the thread will be started after the response + # arrived + Thread.stop + + # get mail from responses + mail = @semaphore.synchronize { @responses.delete(tag) } + + if mail.is_a? MarilynRPC::CallResponseMail + mail.result + else + raise mail.exception + end end end end