lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.1 vs lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.2
- old
+ new
@@ -1,37 +1,25 @@
require 'socket'
+require 'thread'
module MarilynRPC
- class NativeClientProxy
+ class BlankSlate
+ instance_methods.each { |m| undef_method m unless m =~ /^__/ }
+ end
+
+ class NativeClientProxy < BlankSlate
# Creates a new Native client proxy, were the calls get send to the remote
# side.
# @param [Object] path the path that is used to identify the service
- # @param [Socekt] socket the socket to use for communication
- def initialize(path, socket)
- @path, @socket = path, socket
+ # @param [NativeClient] client the client to use for communication
+ def initialize(path, client)
+ @path, @client = path, client
end
# Handler for calls to the remote system
def method_missing(method, *args, &block)
- # since this client can't multiplex, we set the tag to nil
- @socket.write(MarilynRPC::MailFactory.build_call(nil, @path, method, args))
-
- # read the answer of the server back in
- answer = MarilynRPC::Envelope.new
- # read the header to have the size
- answer.parse!(@socket.read(4))
- # so now that we know the site, read the rest of the envelope
- answer.parse!(@socket.read(answer.size))
-
- # returns the result part of the mail or raise the exception if there is
- # one
- mail = MarilynRPC::MailFactory.unpack(answer)
- if mail.is_a? MarilynRPC::CallResponseMail
- mail.result
- else
- raise mail.exception
- end
+ @client.execute(@path, method, args)
end
end
# The client that will handle the socket to the remote. The native client is
# written in pure ruby.
@@ -45,10 +33,31 @@
class NativeClient
# Create a native client for the socket.
# @param [Socket] socket the socket to manage
def initialize(socket)
@socket = socket
+ @semaphore = Mutex.new
+ @threads = {}
+ @responses = {}
+ @thread = Thread.new do
+ loop do
+ # read the answer of the server back in
+ answer = MarilynRPC::Envelope.new
+ # read the header to have the size
+ answer.parse!(@socket.read(4))
+ # so now that we know the site, read the rest of the envelope
+ answer.parse!(@socket.read(answer.size))
+
+ # returns the result part of the mail or raise the exception if there is
+ # one
+ mail = MarilynRPC::MailFactory.unpack(answer)
+ @semaphore.synchronize do
+ @responses[mail.tag] = mail # save the mail for the waiting thread
+ @threads.delete(mail.tag).wakeup # wake up the waiting thread
+ end
+ end
+ end
end
# Disconnect the client from the remote.
def disconnect
@socket.close
@@ -58,11 +67,11 @@
# @param [Object] path the path were the service is registered on the remote
# site
# @return [MarilynRPC::NativeClientProxy] the proxy obejct that will serve
# all calls
def for(path)
- NativeClientProxy.new(path, @socket)
+ NativeClientProxy.new(path, self)
end
# Connect to a unix domain socket.
# @param [String] path the path to the socket file.
# @return [MarilynRPC::NativeClient] the cónnected client
@@ -71,11 +80,67 @@
end
# Connect to a tcp socket.
# @param [String] host the host to cennect to (e.g. 'localhost')
# @param [Integer] port the port to connect to (e.g. 8000)
- # @return [MarilynRPC::NativeClient] the cónnected client
- def self.connect_tcp(host, port)
- new(TCPSocket.open(host, port))
+ # @param [Hash] options the
+ # @option options [Boolean] :secure use tls/ssl for the connection
+ # `true` or `false`
+ # @option options [OpenSSL::SSL::SSLContext] :ssl_context can be used to
+ # change the ssl context of the newly created secure connection. Only
+ # takes effect if `:secure` option is enabled.
+ # @return [MarilynRPC::NativeClient] the connected client
+ def self.connect_tcp(host, port, options = {})
+ if options[:secure] == true
+ require 'openssl' # use openssl for secure connections
+ socket = TCPSocket.new(host, port)
+ if ssl_context = options[:ssl_context]
+ secure_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
+ else
+ secure_socket = OpenSSL::SSL::SSLSocket.new(socket)
+ end
+ secure_socket.connect
+ new(secure_socket)
+ else
+ new(TCPSocket.open(host, port))
+ end
+ end
+
+ # Executes a client call blocking. To issue an async call one needs to
+ # have start separate threads. THe Native client uses then multiplexing to
+ # avoid the other threads blocking.
+ # @api private
+ # @param [Object] path the path to identifiy the service
+ # @param [Symbol, String] method the method name to call on the service
+ # @param [Array<Object>] args the arguments that are passed to the remote
+ # side
+ # @return [Object] the result of the call
+ def execute(path, method, args)
+ thread = Thread.current
+ tag = "#{Time.now.to_f}:#{thread.object_id}"
+
+ @semaphore.synchronize do
+ # since this client can't multiplex, we set the tag to nil
+ @socket.write(MarilynRPC::MailFactory.build_call(tag, path, method, args))
+ end
+
+ # lets write our self to the list of waining threads
+ @semaphore.synchronize { @threads[tag] = thread }
+
+ # enable the listening for a response from the remote end
+ @thread.wakeup
+
+ # stop the current thread, the thread will be started after the response
+ # arrived
+ Thread.stop
+
+ # get mail from responses
+ mail = @semaphore.synchronize { @responses.delete(tag) }
+
+ if mail.is_a? MarilynRPC::CallResponseMail
+ mail.result
+ else
+ raise mail.exception
+ end
end
end
end