lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.3 vs lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.4
- old
+ new
@@ -2,11 +2,11 @@
require 'thread'
module MarilynRPC
# A class with nothing but `__send__` and `__id__`
class ClientBlankSlate
- instance_methods.each { |m| undef_method m unless m =~ /^__/ }
+ instance_methods.each { |m| undef_method m unless m =~ /^__|object_id/ }
end
class NativeClientProxy < ClientBlankSlate
# Creates a new Native client proxy, were the calls get send to the remote
# side.
@@ -30,33 +30,35 @@
# TestService = client.for(:test)
# TestService.add(1, 2)
# TestService.time.to_f
#
class NativeClient
+ MAIL_KEY = :_mlynml
+
# Create a native client for the socket.
# @param [Socket] socket the socket to manage
def initialize(socket)
@socket = socket
@semaphore = Mutex.new
@threads = {}
- @responses = {}
@thread = Thread.new do
+ # read the answer of the server back in
+ envelope = MarilynRPC::Envelope.new
loop do
- # read the answer of the server back in
- answer = MarilynRPC::Envelope.new
# read the header to have the size
- answer.parse!(@socket.read(4))
- # so now that we know the site, read the rest of the envelope
- answer.parse!(@socket.read(answer.size))
+ envelope.parse_header! @socket.read(MarilynRPC::Envelope::HEADER_SIZE)
+ # so now that we know the site, read the rest of the envelope without
+ # parsing
+ envelope.content = @socket.read(envelope.size)
# returns the result part of the mail or raise the exception if there is
# one
- mail = MarilynRPC::MailFactory.unpack(answer)
- @semaphore.synchronize do
- @responses[mail.tag] = mail # save the mail for the waiting thread
- @threads.delete(mail.tag).wakeup # wake up the waiting thread
- end
+ mail = MarilynRPC::MailFactory.unpack(envelope)
+ thread = @semaphore.synchronize { @threads.delete(mail.tag) }
+ thread[MAIL_KEY] = mail # save the mail for the waiting thread
+ thread.wakeup # wake up the waiting thread
+ envelope.reset!
end
end
end
# Disconnect the client from the remote.
@@ -127,26 +129,23 @@
# @return [Object] the result of the call
def execute(path, method, args)
thread = Thread.current
tag = "#{Time.now.to_f}:#{thread.object_id}"
- @semaphore.synchronize do
+ @semaphore.synchronize {
# since this client can't multiplex, we set the tag to nil
@socket.write(MarilynRPC::MailFactory.build_call(tag, path, method, args))
- end
+
+ # lets write our self to the list of waining threads
+ @threads[tag] = thread
+ }
- # lets write our self to the list of waining threads
- @semaphore.synchronize { @threads[tag] = thread }
-
- # enable the listening for a response from the remote end
- @thread.wakeup
-
# stop the current thread, the thread will be started after the response
# arrived
Thread.stop
# get mail from responses
- mail = @semaphore.synchronize { @responses.delete(tag) }
+ mail = thread[MAIL_KEY]
if mail.is_a? MarilynRPC::CallResponseMail
mail.result
else
raise MarilynError.new # raise exception to capture the client backtrace