require 'drb/drb' require 'fileutils' require 'tmpdir' require 'tempfile' require 'fcntl' # # the Slave class encapsulates the work of setting up a drb server in another # process running on localhost. the slave process is attached to it's parent # via a Heartbeat which is designed such that the slave cannot out-live it's # parent and become a zombie, even if the parent dies and early death, such as # by 'kill -9'. the concept and purpose of the Slave class is to be able to # setup any server object in another process so easily that using a # multi-process, drb/ipc, based design is as easy, or easier, than a # multi-threaded one. eg # # class Server # def add_two n # n + 2 # end # end # # slave = Slave.new Server.new # server = slave.object # # p server.add_two(40) #=> 42 # class Slave #--{{{ VERSION = '0.2.0' # # config # DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) DEFAULT_PULSE_RATE = Float(ENV['SLAVE_PULSE_RATE'] || 8) DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false) @socket_creation_attempts = DEFAULT_SOCKET_CREATION_ATTEMPTS @pulse_rate = DEFAULT_PULSE_RATE @debug = DEFAULT_DEBUG # # class methods # class << self #--{{{ # defineds how many attempts will be made to create a temporary unix domain # socket attr :socket_creation_attempts, true # defined the rate of pinging in the Heartbeat object attr :pulse_rate, true # if this is true and you are running from a terminal information is printed # on STDERR attr :debug, true # look up a value in an option hash failing back to class defaults def getval key, opts = {} #--{{{ keys = [key, key.to_s, key.to_s.intern] keys.each{|k| return opts[k] if opts.has_key?(k)} send key rescue nil #--}}} end # just fork with out silly warnings def fork &block #--{{{ v = $VERBOSE begin $VERBOSE = nil Process::fork &block ensure $VERBOSE = v end #--}}} end #--}}} end attr :object attr :obj attr :psname attr :pid attr :ppid attr :uri attr :pulse_rate attr :socket attr :debug attr :status # # 'obj' can be any object and 'opts' may contain the keys # 'socket_creation_attempts', 'pulse_rate', 'psname', or 'debug' # def initialize obj = nil, opts = {}, &block #--{{{ raise ArgumentError, "no slave object!" if obj.nil? and block.nil? @obj = obj @socket_creation_attempts = getval('socket_creation_attempts', opts) @pulse_rate = getval('pulse_rate', opts) @debug = getval('debug', opts) @psname = getval('psname', opts) || gen_psname(@obj) trace{ "socket_creation_attempts <#{ @socket_creation_attempts }>" } trace{ "pulse_rate <#{ @pulse_rate }>" } trace{ "psname <#{ @psname }>" } @shutdown = false @waiter = @status = nil @heartbeat = Heartbeat::new @pulse_rate, @debug @r, @w = IO::pipe # # child # unless((@pid = Slave::fork)) e = nil begin $0 = @psname @pid = Process::pid @ppid = Process::ppid @r.close @socket = nil @uri = nil tmpdir = Dir::tmpdir basename = File::basename @psname server = @obj || block.call @socket_creation_attempts.times do |attempt| begin s = File::join(tmpdir, "#{ basename }_#{ attempt }") u = "drbunix://#{ s }" DRb::start_service u, server @socket = s @uri = u trace{ "child - socket <#{ @socket }>" } trace{ "child - uri <#{ @uri }>" } break rescue Errno::EADDRINUSE nil end end if @socket and @uri @heartbeat.start @w.write @socket @w.close trap('SIGUSR2') do # @heartbeat.stop rescue nil DBb::thread.kill rescue nil FileUtils::rm_f @socket rescue nil exit! end block[obj] if block and obj DRb::thread.join else @w.close end rescue Exception => e trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } ensure status = e.respond_to?('status') ? e.status : 1 exit!(status) end # # parent # else #Process::detach @pid detach @w.close @socket = @r.read @r.close trace{ "parent - socket <#{ @socket }>" } if @socket and File::exist? @socket at_exit{ FileUtils::rm_f @socket } @uri = "drbunix://#{ socket }" trace{ "parent - uri <#{ @uri }>" } @heartbeat.start # # starting drb on localhost avoids dns lookups! # DRb::start_service('druby://localhost:0', nil) unless DRb::thread @object = DRbObject::new nil, @uri else raise "failed to find slave socket <#{ @socket }>" end end #--}}} end # # starts a thread to attempt collecting the child status # def detach #--{{{ @waiter = Thread.new{ @status = Process::waitpid2(@pid).last } #--}}} end # # wait for slave to finish # def wait #--{{{ @waiter.value #--}}} end alias :wait2 :wait # # stops the heartbeat thread and kills the child process # def shutdown #--{{{ raise "already shutdown" if @shutdown @heartbeat.stop rescue nil Process::kill('SIGUSR2', @pid) rescue nil Process::kill('SIGTERM', @pid) rescue nil FileUtils::rm_f @socket @shutdown = true #--}}} end # # generate a default name to appear in ps/top # def gen_psname obj #--{{{ "#{ obj.class }_slave_of_#{ Process::pid }".downcase.gsub(%r/\s+/,'_') #--}}} end # # see docs for Slave.getval # def getval key, opts = {} #--{{{ self.class.getval key #--}}} end # # debugging output - ENV['SLAVE_DEBUG']=1 to enable # def trace #--{{{ STDERR.puts(yield) if @debug and STDERR.tty? #--}}} end # # the Heartbeat class is essentially wrapper over an IPC channel that sends # a ping on the channel indicating process health. if either end of the # channel is detached the ping will fail and an error will be raised. in # this way it is ensured that Slave objects cannot continue to live without # their parent being alive. # class Heartbeat #--{{{ def initialize pulse_rate = 4.2, debug = false #--{{{ @pulse_rate = Float pulse_rate @debug = debug @r, @w = IO::pipe @pid = Process::pid @ppid = Process::ppid @cid = nil @thread = nil @ppid = nil @whoami = nil @beating = nil @pipe = nil #--}}} end def start #--{{{ if Process::pid == @pid @w.close @pipe = @r @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC parent_start else @r.close @pipe = @w child_start end @beating = true #--}}} end def parent_start #--{{{ @whoami = 'parent' @thread = Thread::new(Thread::current) do |cur| begin loop do buf = @pipe.gets trace{ "<#{ @whoami }> <#{ @pid }> gets <#{ buf.inspect }>" } @cid = Integer buf.strip if @cid.nil? and buf =~ %r/^\s*\d+\s*$/ end rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def child_start #--{{{ @whoami = 'child' @pid = Process::pid @ppid = Process::ppid @thread = Thread::new(Thread::current) do |cur| begin loop do trace{ "<#{ @whoami }> <#{ @pid }> puts <#{ @pid }>" } @pipe.puts @pid Process::kill 0, @ppid sleep @pulse_rate end rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def start #--{{{ if Process::pid == @pid @r.close @pipe = @w @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC parent_start else @w.close @pipe = @r child_start end @beating = true #--}}} end def parent_start #--{{{ @whoami = 'parent' @thread = Thread::new(Thread::current) do |cur| begin sleep rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def child_start #--{{{ @whoami = 'child' @pid = Process::pid @ppid = Process::ppid @thread = Thread::new(Thread::current) do |cur| begin trace{ "child reading..." } @pipe.read trace{ "child read." } trace{ "child exiting." } exit! rescue => e cur.raise e ensure @pipe.close rescue nil end end #--}}} end def stop #--{{{ raise "not beating" unless @beating @thread.kill @pipe.close rescue nil @beating = false #--}}} end def trace #--{{{ STDERR.puts(yield) if @debug and STDERR.tty? #--}}} end #--}}} end # class Heartbeat #--}}} end # class Slave