=begin rdoc = Object#detach Written by Evan Webb Detach an object into a seperate process. Simplest example expands to all objects. require 'detach' s = "blah" s2 = s.detach s2.size # 4 s2.to_s # blah This basic example is true for all objects. All methods are passed to the object running in the other process and the return of #detach can be used in exactly the same way as the original object. Useful in splitting of sections of a application that can then use their own thread table and will have better performance then running all parts of a large application under one ruby thread scheduler. =end require 'drb' module Dispatch # These are created for each background work unit (method call) # that is created. They keep stats and allow for indexing # when the value is ready. class BackgroundUnit attr_accessor :unit_id, :start, :stop attr_reader :value, :thread def initialize(id) @unit_id = id @thread = nil @start = nil @stop = nil end # Sets the thread of the unit def thread=(th) @thread = th @start = Time.now end # Sets the value of the unit def value=(v) @stop = Time.now @value = v end # Indicates the elapse time spent on the unit def elapse @stop - @start end # Indicates if the unit has finished processing def ready? !@thread.alive? end end # Attached as the front object to a DRb, and given it's own # front object, method calls on this object spin a new thread # to handle the method and returns a BackgroundObject right # away. Future usage of said BackgroundObject causes it to # request the real object from the Dispatch (via DRb), at # which time operations may block. This allows for the time # between object creation and object usage for the real remote # object to do it's working, not blocking the local objects. class BackgroundDispatch undef :to_s undef :to_a undef :respond_to? def initialize(front,drb=false) @front = front @units = Hash.new @drb = drb end # This is defined to keep DRb and such from trying # to move a the Dispatcher, which must remain in the same # context as the called method. Though, having Thread # objects referenced with @units makes that true as well. def _dump(d) raise TypeError, "can't be moved" end # Someone has requested an operation on # this front object to be performed. Ok, lets spin # of the method to do it's working and return a # BackgroundObject as a stand in. def method_missing(msg_id, *a, &b) # puts "Calling #{msg_id}..." if @drb obj = BackgroundObject.new(DRbObject.new(self)) else obj = BackgroundObject.new(self) end unit = BackgroundUnit.new(obj.index) unit.thread = Thread.new(unit) { |u| # puts " >> In thread, sending method to real object (#{obj.index})" u.value = @front.__send__(msg_id, *a, &b) # puts " >> Finished operation (#{obj.index})" } @units[obj.index] = unit # puts "Returning a BO for #{obj.index}" return obj end # Used fetch the value of the work unit associated with # the BackgroundObject at index. This method blocks, # waiting until the value is ready before returning it. def _get_obj(index) # puts "Fetching object #{index} in #{Process.pid}" # puts "Unit is #{@units[index].inspect}" if not @units[index].ready? @units[index].thread.join end # puts "Returning value.." return @units[index].value end # Used to check if the work unit is ready. def _obj_ready?(uid) # puts "Checking for readiness in #{Process.pid}" @units[uid].ready? end # Used for stats transparent back to the caller. # This is done in seperate struct rather then just # returning the unit object because it can easily be # seralized and used on the local side of a DRb session Stats = Struct.new(:start, :stop, :running) # Returns the stats for a work unit. def _obj_stats(uid) unit = @units[uid] if unit.nil? return Stats.new end return Stats.new(unit.start,unit.stop,unit.thread.alive?) end end # Used to create non-blocking remote method calls. Has an # index of a work unit and a dispatch object to call on # to do all the work. class BackgroundObject attr_reader :index def initialize(dispatch) @dispatch = dispatch @index = self.object_id() end undef :to_s undef :to_a undef :respond_to? # Passes all unknown method calls back to the # original object by way of the dispatch. def method_missing(msg_id, *a, &b) # puts "in #{Process.pid}, asking for #{msg_id} from a #{@dispatch.class}" obj = @dispatch._get_obj(@index) # puts "Ret is a #{obj.class}" obj.__send__(msg_id,*a,&b) end # Returns true if the real object is ready. def ready? # puts "Checking for readiness from #{Process.pid}" @dispatch._obj_ready?(@index) end alias :done? :ready? # Returns stats for the backgrouded call. def stats @dispatch._obj_stats(@index) end # I added when it choked needing this def self._load(d) end end end class Object # Accepts a hash of options to use when detaching. # # :ps - Controls if the title of the new process is changed # to reflect it being a detached object. # # Values: # * :append - Append information to the current process title # * :replace - Replace the current process title all together # * :none - Don't change the process title at all # # # :name - Sets what the name of the process actually is. # # Values: # * :parent - Uses the current process title as the name of the # detached process (ie, $0 of the parent) # * String - Any String value is used as the name # # # :id - Sets what the id of the process will be in the new # process title. # # Values: # * :object Uses a default id, looks like the default Object#to_s # * String Any String value is used as the id # # # :die - Controls if the child process will be killed if the # parent exits. # # Values: # * :parent Detached object process follows the course of # the parent process # * :none The detached object process will continue running # inspite of the fate of the parent process # # :path - The template of the location of the pipe used for # communication between the parent and the detached process. # # Defaults to: '/tmp/drb_detach.#{pid}' # # Use single quotes to embed variables to be filled in later. # # Available variables: # * pid The process id of the detached process # # # :allrefs - Controls how objects are passed to and from the # detached process. If :allrefs is set to true, all objects # passed back and forth from the remote object are only # passed as references, so that the original executing context # (ie. process) is always used. This option defaults to true # because it is the expect operating procedure. # # The use of this option takes pains to allow objects you would # always expect to be Marshalled to still be Marshalled, such as # Symbol, String, Fixnum, Bignum, Array, etc. # # NOTE: if you have problems with objects passed to detached # objects executing there methods in the detached process, and # you want them executed in the original context, try including # Drb::Undumped in your object rather then using this. # # :background - Controls how methods are dispatch on the detached # object. If :background is true, then all method calls are done # non-blocking, and a place holder object is returned. When # that place holder object has it's methods called, it blocks, # waiting for the real method operation to finish. def detach(cfg={}) cfg = { :ps => :append, :die => :parent, :name => :parent, :id => :object, :path => '/tmp/drb_detach.#{pid}', :allref => true, :background => true }.merge(cfg) if cfg[:name] == :parent name = $0 else name = cfg[:name].to_s end if cfg[:id] == :object id = "#<#{self.class}:0x#{self.object_id.to_s(16)}>" else id = cfg[:id] end pid = fork { # A nice little trick case cfg[:ps] when :append $0 = name + " - #{id}" when :replace $0 = id when :none break end if cfg[:ps].kind_of? String $0 = cfg[:ps] end trap("INT") { exit } pid = Process.pid obj = nil if cfg[:background] == true obj = Dispatch::BackgroundDispatch.new(self,true) else obj = self end DRb.start_service("drbunix://"+eval(%!"#{cfg[:path]}"!),obj) DRb.thread.join exit } # Mom always said I should clean up after myself if cfg[:die] == :parent at_exit { Process.kill("INT", pid) } end sleep 0.3 DRb.start_service remote = DRbObject.new(nil,"drbunix://"+eval(%!"#{cfg[:path]}"!)) if cfg[:allref] == true remote.instance_eval "alias :real_method_missing :method_missing" def remote.method_missing(msg_id, *a, &b) a.each do |i| allow = true [Symbol, String, Fixnum, Bignum, Array, Regexp].each do |t| if i.kind_of? t allow = false break end end next if not allow def i._dump(d) raise TypeError, 'can\'t dump' end end return real_method_missing(msg_id, *a) end end return remote end end =begin test require 'test/unit' # fixture class Logger def log(str) puts "From #{Process.pid}: #{str.to_s}" end end class Person attr_accessor :log def say(msg) puts "Detached #{Process.pid}" @log.log("hello, i said #{msg}") end def wait sleep 10 "hi" end end class TC_Detach < Test::Unit::TestCase puts "Parent is #{Process.pid}" log = Logger.new p = Person.new p = p.detach(:allref => true) puts "p is a #{p.class}" p.log = log p.say("I won!") s = p.wait puts "s is a #{s.class}" puts "While i'm here, the other process should be doing it's thing." puts "It's waiting for 10 seconds, so if i wait for 5, it should only be another 5 after i ask for the method that it returns" sleep 5 puts "Lets try something..." while not s.ready? puts "Is the method ready? (#{s.ready?})" sleep 2 end p s.to_s puts "Stats:" p s.stats end =end