require 'drb/drb' require 'fileutils' require 'tmpdir' require 'tempfile' require 'fcntl' # # the Slave class encapsulates the work of setting up a drb server in another # process running on localhost. the slave process is attached to it's parent # via a Heartbeat which is designed such that the slave cannot out-live it's # parent and become a zombie, even if the parent dies and early death, such as # by 'kill -9'. the concept and purpose of the Slave class is to be able to # setup any server object in another process so easily that using a # multi-process, drb/ipc, based design is as easy, or easier, than a # multi-threaded one. eg # # class Server # def add_two n # n + 2 # end # end # # slave = Slave.new 'object' => Server.new # server = slave.object # # p server.add_two(40) #=> 42 # # two other methods of providing server objects exist: # # a) server = Server.new "this is called the parent" } # Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"} # # b) Slave.new{ Server.new "this is called only in the child" } # # of the two 'b' is preferred. # class Slave #--{{{ VERSION = '1.0.0' def self.version() VERSION end # # config # DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) DEFAULT_PULSE_RATE = Float(ENV['SLAVE_PULSE_RATE'] || 8) DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false) @socket_creation_attempts = DEFAULT_SOCKET_CREATION_ATTEMPTS @pulse_rate = DEFAULT_PULSE_RATE @debug = DEFAULT_DEBUG # # class methods # class << self #--{{{ # defineds how many attempts will be made to create a temporary unix domain # socket attr :socket_creation_attempts, true # defined the rate of pinging in the Heartbeat object attr :pulse_rate, true # if this is true and you are running from a terminal information is printed # on STDERR attr :debug, true # get a default value def default key #--{{{ send key #--}}} end def getopts opts #--{{{ raise ArgumentError, opts.class unless opts.respond_to?('has_key?') and opts.respond_to?('[]') lambda do |key, *defval| defval = defval.shift keys = [key, key.to_s, key.to_s.intern] key = keys.detect{|k| opts.has_key? k } and break opts[key] defval end #--}}} end # just fork with out silly warnings def fork &block #--{{{ v = $VERBOSE begin $VERBOSE = nil Process::fork &block ensure $VERBOSE = v end #--}}} end #--}}} end attr :obj attr :socket_creation_attempts attr :pulse_rate attr :debug attr :psname attr :at_exit attr :shutdown attr :status attr :object attr :pid attr :ppid attr :uri attr :socket # # opts may contain the keys 'object', 'socket_creation_attempts', # 'pulse_rate', 'psname', 'dumped', or 'debug' # def initialize opts = {}, &block #--{{{ getopt = getopts opts @obj = getopt['object'] @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') @pulse_rate = getopt['pulse_rate'] || default('pulse_rate') @debug = getopt['debug'] || default('debug') @psname = getopt['psname'] @at_exit = getopt['at_exit'] @dumped = getopt['dumped'] raise ArgumentError, 'no slave object!' if @obj.nil? and block.nil? @shutdown = false @waiter = @status = nil @heartbeat = Heartbeat::new @pulse_rate, @debug @r, @w = IO::pipe @r2, @w2 = IO::pipe # weird syntax because dot/rdoc chokes on this!?!? init_failure = lambda do |e| o = Object.new class << o attr_accessor '__slave_object_failure__' end o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace] @object = o end # # child # unless((@pid = Slave::fork)) e = nil begin Kernel.at_exit{ Kernel.exit! } if @obj @object = @obj else begin @object = block.call rescue Exception => e init_failure[e] end end if block and @obj begin block[@obj] rescue Exception => e init_failure[e] end end $0 = (@psname ||= gen_psname(@object)) unless @dumped or @object.respond_to?('__slave_object_failure__') @object.extend DRbUndumped end @ppid, @pid = Process::ppid, Process::pid @r.close @r2.close @socket = nil @uri = nil tmpdir, basename = Dir::tmpdir, File::basename(@psname) @socket_creation_attempts.times do |attempt| se = nil begin s = File::join(tmpdir, "#{ basename }_#{ attempt }") u = "drbunix://#{ s }" DRb::start_service u, @object @socket = s @uri = u trace{ "child - socket <#{ @socket }>" } trace{ "child - uri <#{ @uri }>" } break rescue Errno::EADDRINUSE => se nil end end if @socket and @uri @heartbeat.start trap('SIGUSR2') do # @heartbeat.stop rescue nil DBb::thread.kill rescue nil FileUtils::rm_f @socket rescue nil exit end @w.write @socket @w.close DRb::thread.join else @w.close end rescue Exception => e trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } ensure status = e.respond_to?('status') ? e.status : 1 exit(status) end # # parent # else detach @w.close @w2.close @socket = @r.read @r.close trace{ "parent - socket <#{ @socket }>" } if @at_exit @at_exit_thread = Thread.new{ Thread.current.abort_on_exception = true @r2.read rescue 42 if @at_exit.respond_to? 'call' @at_exit.call self else send @at_exit.to_s, self end } end if @socket and File::exist? @socket Kernel.at_exit{ FileUtils::rm_f @socket } @uri = "drbunix://#{ socket }" trace{ "parent - uri <#{ @uri }>" } @heartbeat.start # # starting drb on localhost avoids dns lookups! # DRb::start_service('druby://localhost:0', nil) unless DRb::thread @object = DRbObject::new nil, @uri if @object.respond_to? '__slave_object_failure__' c, m, bt = Marshal.load @object.__slave_object_failure__ (e = c.new(m)).set_backtrace bt raise e end @psname ||= gen_psname(@object) else raise "failed to find slave socket <#{ @socket }>" end end #--}}} end # # starts a thread to collect the child status and sets up at_exit handler to # prevent zombies. the at_exit handler is canceled if the thread is able to # collect the status # def detach #--{{{ reap = lambda do |cid| begin @status = Process::waitpid2(cid).last rescue Exception => e m, c, b = e.message, e.class, e.backtrace.join("\n") warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD end end Kernel.at_exit do shutdown rescue nil reap[@pid] rescue nil end @waiter = Thread.new do begin @status = Process::waitpid2(@pid).last ensure reap = lambda{|cid| 'no-op' } end end #--}}} end # # wait for slave to finish. if the keyword 'non_block'=>true is given a # thread is returned to do the waiting in an async fashion. eg # # thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"} # def wait opts = {}, &b #--{{{ b ||= lambda{|exit_status|} non_block = getopts(opts)['non_block'] non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] #--}}} end alias :wait2 :wait # # stops the heartbeat thread and kills the child process - give the key # 'quiet' to ignore errors shutting down, including having already shutdown # def shutdown opts = {} #--{{{ quiet = getopts(opts)['quiet'] raise "already shutdown" if @shutdown unless quiet failure = lambda{ raise $! unless quiet } @heartbeat.stop rescue failure.call Process::kill('SIGUSR2', @pid) rescue failure.call @shutdown = true #--}}} end # # true # def shutdown? #--{{{ @shutdown #--}}} end # # generate a default name to appear in ps/top # def gen_psname obj #--{{{ "#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_') #--}}} end # # see docs for Slave.default # def default key #--{{{ self.class.default key #--}}} end # # see docs for Slave.getopts # def getopts opts #--{{{ self.class.getopts opts #--}}} end # # debugging output - ENV['SLAVE_DEBUG']=1 to enable # def trace #--{{{ STDERR.puts(yield) if @debug and STDERR.tty? #--}}} end # # the Heartbeat class is essentially wrapper over an IPC channel that sends # a ping on the channel indicating process health. if either end of the # channel is detached the ping will fail and an error will be raised. in # this way it is ensured that Slave objects cannot continue to live without # their parent being alive. # class Heartbeat #--{{{ def initialize pulse_rate = 4.2, debug = false #--{{{ @pulse_rate = Float pulse_rate @debug = debug @r, @w = IO::pipe @pid = Process::pid @ppid = Process::ppid @cid = nil @thread = nil @ppid = nil @whoami = nil @beating = nil @pipe = nil #--}}} end def start #--{{{ if Process::pid == @pid @w.close @pipe = @r @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC parent_start else @r.close @pipe = @w child_start end @beating = true #--}}} end def parent_start #--{{{ @whoami = 'parent' @thread = Thread::new(Thread::current) do |cur| begin loop do buf = @pipe.gets trace{ "<#{ @whoami }> <#{ @pid }> gets <#{ buf.inspect }>" } @cid = Integer buf.strip if @cid.nil? and buf =~ %r/^\s*\d+\s*$/ end rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def child_start #--{{{ @whoami = 'child' @pid = Process::pid @ppid = Process::ppid @thread = Thread::new(Thread::current) do |cur| begin loop do trace{ "<#{ @whoami }> <#{ @pid }> puts <#{ @pid }>" } @pipe.puts @pid Process::kill 0, @ppid sleep @pulse_rate end rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def start #--{{{ if Process::pid == @pid @r.close @pipe = @w @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC parent_start else @w.close @pipe = @r child_start end @beating = true #--}}} end def parent_start #--{{{ @whoami = 'parent' @thread = Thread::new(Thread::current) do |cur| begin sleep rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def child_start #--{{{ @whoami = 'child' @pid = Process::pid @ppid = Process::ppid @thread = Thread::new(Thread::current) do |cur| begin trace{ "child reading..." } @pipe.read trace{ "child read." } trace{ "child exiting." } exit rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def stop #--{{{ raise "not beating" unless @beating @thread.kill @pipe.close rescue nil @beating = false #--}}} end def trace #--{{{ STDERR.puts(yield) if @debug and STDERR.tty? #--}}} end #--}}} end # class Heartbeat #--}}} end # class Slave