# Copyright (c) 2007, 2011 Samuel G. D. Williams. # Released under the MIT license. Please see LICENSE.txt for license details. # This class is as small and independant as possible as it will get sent to clients for execution. require 'thread' require 'monitor' module RExec # A wrapper for sending method invocations over a Connection class Invocation def initialize(name, arguments) @name = name @arguments = arguments end def apply(object) object.send(@name, *@arguments) end class Result def initialize(value) @value = value end attr :value end end # A proxy class to create and send Invocation objects via a Connection, and receive a result. class Proxy def initialize(connection) @connection = connection @method_mutex = Mutex.new end def method_missing(name, *arguments) invocation = Invocation.new(name, arguments) result = nil # Connection provides no transaction support. This means that # if multiple threads are sending and receiving arbirary objects # via the connection, the Proxy object may fail due to out of # line objects. @method_mutex.synchronize do # Send the invocation. @connection.send_object(invocation) # Wait for the result. result = @connection.receive_object end if Invocation::Result === result return result.value else raise InvalidResponse.new("Invalid response received: #{result}") end end end # This class represents an abstract connection to another ruby process. The interface does not impose # any structure on the way this communication link works, except for the fact you can send and receive # objects. You can implement whatever kind of idiom you need for communication on top of this library. # # Depending on how you set things up, this could connect to a local ruby process, or a remote ruby process # via `ssh`. # # To set up a connection, you need to use {start_server}. class Connection public def self.build(process, options, &block) cin = process.input cout = process.output cerr = process.error # We require both cin and cout to be connected in order for connection to work raise InvalidConnectionError.new("Input (#{cin}) or Output (#{cout}) is not connected!") unless cin and cout yield cin cin.puts("\004") return self.new(cout, cin, cerr) end # Create a new connection. You need to supply a pipe for reading input, a pipe for sending output, # and optionally a pipe for errors to be read from. def initialize(input, output, error = nil) @input = input @output = output @running = true @error = error @receive_mutex = Mutex.new @send_mutex = Mutex.new @proxy = Proxy.new(self) @handler = nil end # The object that will handle remote proxy invocations. attr :handler, true # The proxy object that will dispatch RPCs. attr :proxy # The pipe used for reading data def input @input end # The pipe used for writing data def output @output end # The pipe used for receiving errors. On the client side this pipe is writable, on the server # side this pipe is readable. You should avoid using it on the client side and simply use $stderr. def error @error end # Stop the connection, and close the output pipe. def stop if @running @running = false @output.close end end # Return whether or not the connection is running. def running? @running end # This is a very simple runloop. It provides an object when it is received. def run(&block) while @running pipes = IO.select([@input]) if pipes[0].size > 0 object = receive_object if object == nil @running = false return end begin if @handler && Invocation === object result = object.apply(@handler) send_object(Invocation::Result.new(result)) else yield object end rescue Exception => ex send_object(ex) end end end end # Dump any text which has been written to $stderr in the child process. def dump_errors(to = $stderr) if @error and !@error.closed? while true result = IO.select([@error], [], [], 0) break if result == nil to.puts @error.readline.chomp end end end # Receive an object from the connection. This function is thread-safe. This function may block. def receive_object object = nil @receive_mutex.synchronize do begin object = Marshal.load(@input) rescue EOFError object = nil @running = false end end if object and object.kind_of?(Exception) raise object end return object end # Send object(s). This function is thread-safe. def send_object(*objects) @send_mutex.synchronize do objects.each do |o| data = Marshal.dump(o) @output.write(data) end @output.flush end end end end