lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.3 vs lib/marilyn-rpc/client.rb in marilyn-rpc-0.0.4

- old
+ new

@@ -2,11 +2,11 @@ require 'thread' module MarilynRPC # A class with nothing but `__send__` and `__id__` class ClientBlankSlate - instance_methods.each { |m| undef_method m unless m =~ /^__/ } + instance_methods.each { |m| undef_method m unless m =~ /^__|object_id/ } end class NativeClientProxy < ClientBlankSlate # Creates a new Native client proxy, were the calls get send to the remote # side. @@ -30,33 +30,35 @@ # TestService = client.for(:test) # TestService.add(1, 2) # TestService.time.to_f # class NativeClient + MAIL_KEY = :_mlynml + # Create a native client for the socket. # @param [Socket] socket the socket to manage def initialize(socket) @socket = socket @semaphore = Mutex.new @threads = {} - @responses = {} @thread = Thread.new do + # read the answer of the server back in + envelope = MarilynRPC::Envelope.new loop do - # read the answer of the server back in - answer = MarilynRPC::Envelope.new # read the header to have the size - answer.parse!(@socket.read(4)) - # so now that we know the site, read the rest of the envelope - answer.parse!(@socket.read(answer.size)) + envelope.parse_header! @socket.read(MarilynRPC::Envelope::HEADER_SIZE) + # so now that we know the site, read the rest of the envelope without + # parsing + envelope.content = @socket.read(envelope.size) # returns the result part of the mail or raise the exception if there is # one - mail = MarilynRPC::MailFactory.unpack(answer) - @semaphore.synchronize do - @responses[mail.tag] = mail # save the mail for the waiting thread - @threads.delete(mail.tag).wakeup # wake up the waiting thread - end + mail = MarilynRPC::MailFactory.unpack(envelope) + thread = @semaphore.synchronize { @threads.delete(mail.tag) } + thread[MAIL_KEY] = mail # save the mail for the waiting thread + thread.wakeup # wake up the waiting thread + envelope.reset! end end end # Disconnect the client from the remote. @@ -127,26 +129,23 @@ # @return [Object] the result of the call def execute(path, method, args) thread = Thread.current tag = "#{Time.now.to_f}:#{thread.object_id}" - @semaphore.synchronize do + @semaphore.synchronize { # since this client can't multiplex, we set the tag to nil @socket.write(MarilynRPC::MailFactory.build_call(tag, path, method, args)) - end + + # lets write our self to the list of waining threads + @threads[tag] = thread + } - # lets write our self to the list of waining threads - @semaphore.synchronize { @threads[tag] = thread } - - # enable the listening for a response from the remote end - @thread.wakeup - # stop the current thread, the thread will be started after the response # arrived Thread.stop # get mail from responses - mail = @semaphore.synchronize { @responses.delete(tag) } + mail = thread[MAIL_KEY] if mail.is_a? MarilynRPC::CallResponseMail mail.result else raise MarilynError.new # raise exception to capture the client backtrace