lib/slave.rb in slave-1.0.0 vs lib/slave.rb in slave-1.1.0

- old
+ new

@@ -1,20 +1,24 @@ require 'drb/drb' require 'fileutils' require 'tmpdir' require 'tempfile' require 'fcntl' +require 'socket' +require 'sync' +# TODO - lifeline need close-on-exec set in it! + # # the Slave class encapsulates the work of setting up a drb server in another -# process running on localhost. the slave process is attached to it's parent -# via a Heartbeat which is designed such that the slave cannot out-live it's -# parent and become a zombie, even if the parent dies and early death, such as -# by 'kill -9'. the concept and purpose of the Slave class is to be able to -# setup any server object in another process so easily that using a -# multi-process, drb/ipc, based design is as easy, or easier, than a -# multi-threaded one. eg +# process running on localhost via unix domain sockets. the slave process is +# attached to it's parent via a LifeLine which is designed such that the slave +# cannot out-live it's parent and become a zombie, even if the parent dies and +# early death, such as by 'kill -9'. the concept and purpose of the Slave +# class is to be able to setup any server object in another process so easily +# that using a multi-process, drb/ipc, based design is as easy, or easier, +# than a multi-threaded one. eg # # class Server # def add_two n # n + 2 # end @@ -34,43 +38,43 @@ # # of the two 'b' is preferred. # class Slave #--{{{ - VERSION = '1.0.0' + VERSION = '1.1.0' def self.version() VERSION end # - # config + # env config # - DEFAULT_SOCKET_CREATION_ATTEMPTS = - Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) - - DEFAULT_PULSE_RATE = - Float(ENV['SLAVE_PULSE_RATE'] || 8) - - DEFAULT_DEBUG = - (ENV['SLAVE_DEBUG'] ? true : false) - + DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) + DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false) + DEFAULT_THREADSAFE = (ENV['SLAVE_THREADSAFE'] ? true : false) + # + # class initialization + # @socket_creation_attempts = DEFAULT_SOCKET_CREATION_ATTEMPTS - @pulse_rate = DEFAULT_PULSE_RATE @debug = DEFAULT_DEBUG + @threadsafe = DEFAULT_THREADSAFE # # class methods # class << self #--{{{ # defineds how many attempts will be made to create a temporary unix domain # socket attr :socket_creation_attempts, true - # defined the rate of pinging in the Heartbeat object - attr :pulse_rate, true - # if this is true and you are running from a terminal information is printed # on STDERR attr :debug, true + # if this is true all slave objects will be wrapped such that any call + # to the object is threadsafe. if you do not use this you must ensure + # that your objects are threadsafe __yourself__ as this is required of + # any object acting as a drb server + attr :threadsafe, true + # get a default value def default key #--{{{ send key #--}}} @@ -103,54 +107,240 @@ #--}}} end #--}}} end + # + # helper classes + # + # + # ThreadSafe is a delegate wrapper class used for implementing gross thread + # safety around existing objects. when an object is wrapped with this class + # as + # + # ts = ThreadSafe.new{ AnyObject.new } + # + # then ts can be used exactly as the normal object would have been, only all + # calls are now thread safe. this is the mechanism behind the + # 'threadsafe'/:threadsafe keyword to Slave#initialize + # + class ThreadSafe +#--{{{ + instance_methods.each{|m| undef_method unless m[%r/__/]} + def initialize object + @object = object + @sync = Sync.new + end + def ex + @sync.synchronize{ yield } + end + def method_missing m, *a, &b + ex{ @object.send m, *a, &b } + end + def respond_to? m + ex{ @object.respond_to? m } + end + def inspect + ex{ @object.inspect } + end + def class + ex{ @object.class } + end +#--}}} + end + # + # a simple thread safe hash used to map object_id to a set of file + # descriptors in the LifeLine class. see LifeLine::FDS + # + class ThreadSafeHash < Hash + def self.new(*a, &b) ThreadSafe.new(super) end + end + # + # the LifeLine class is used to communitacte between child and parent + # processes and to prevent child processes from ever becoming zombies or + # otherwise abandoned by their parents. the basic concept is that a socket + # pair is setup between child and parent. the child process, because it is + # a Slave, sets up a handler such that, should it's socket ever grow stale, + # will exit the process. this class replaces the HeartBeat class from + # previous Slave versions. + # + class LifeLine +#--{{{ + FDS = ThreadSafeHash.new + + def initialize + @pair = Socket.pair Socket::AF_UNIX, Socket::SOCK_STREAM, 0 + @owner = Process.pid + @pid = nil + @socket = nil + @object_id = object_id + + @fds = @pair.map{|s| s.fileno} + oid, fds = @object_id, @fds + FDS[oid] = fds + ObjectSpace.define_finalizer(self){ FDS.delete oid } + end + + def owner? + Process.pid == @owner + end + + def throw *ignored + raise unless owner? + @pair[-1].close + @pair[-1] = nil + @pid = Process.pid + @socket = @pair[0] + @socket.sync = true + end + + def catch *ignored + raise if owner? + @pair[0].close + @pair[0] = nil + @pid = Process.pid + @socket = @pair[-1] + @socket.sync = true + close_unused_sockets_after_forking + end + + def close_unused_sockets_after_forking + begin + to_delete = [] + begin + FDS.each do |oid, fds| + next if oid == @object_id + begin + IO.for_fd(fds.first).close + rescue Exception => e + STDERR.puts "#{ e.message } (#{ e.class })\n#{ e.backtrace.join 10.chr }" + ensure + to_delete << oid + end + end + ensure + FDS.ex{ to_delete.each{|oid| FDS.delete oid rescue 42} } + end + GC.start + rescue Exception => e + 42 + end + end + + def cut + raise unless owner? + raise unless @socket + @socket.close rescue nil + FDS.delete object_id + end + alias_method "release", "cut" + + DELEGATED = %w( puts gets read write close flush each ) + + DELEGATED.each do |m| + code = <<-code + def #{ m }(*a, &b) + raise unless @socket + @socket.#{ m } *a, &b + end + code + module_eval code, __FILE__, __LINE__ + end + + def on_cut &b + at_exit{ begin; b.call; ensure; b = nil; end if b} + Thread.new(Thread.current){|current| + Thread.current.abort_on_exception = true + begin + each{|*a|} + rescue Exception + current.raise $! + 42 + ensure + begin; b.call; ensure; b = nil; end if b + end + } + end + + def cling &b + on_cut{ begin; b.call if b; ensure; Kernel.exit; end }.join + end +#--}}} + end + + # + # attrs + # attr :obj attr :socket_creation_attempts - attr :pulse_rate attr :debug attr :psname attr :at_exit + attr :dumped attr :shutdown attr :status attr :object attr :pid attr :ppid attr :uri attr :socket - # - # opts may contain the keys 'object', 'socket_creation_attempts', - # 'pulse_rate', 'psname', 'dumped', or 'debug' + # sets up a child process serving any object as a DRb server running locally + # on unix domain sockets. the child process has a LifeLine established + # between it and the parent, making it impossible for the child to outlive + # the parent (become a zombie). the object to serve is specfied either + # directly using the 'object'/:object keyword # + # Slave.new :object => MyServer.new + # + # or, preferably, using the block form + # + # Slave.new{ MyServer.new } + # + # when the block form is used the object is contructed in the child process + # itself. this is quite advantageous if the child object consumes resources + # or opens file handles (db connections, etc). by contructing the object in + # the child any resources are consumed from the child's address space and + # things like open file handles will not be carried into subsequent child + # processes (via standard unix fork semantics). in the event that a block + # is specified but the object cannot be constructed and, instead, throws and + # Exception, that exception will be propogated to the parent process. + # + # opts may contain the following keys, as either strings or symbols + # + # object : specify the slave object. otherwise block value is used. + # socket_creation_attempts : specify how many attempts to create a unix domain socket will be made + # debug : turn on some logging to STDERR + # psname : specify the name that will appear in 'top' ($0) + # at_exit : specify a lambda to be called in the *parent* when the child dies + # dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) + # threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety + # def initialize opts = {}, &block #--{{{ getopt = getopts opts @obj = getopt['object'] @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') - @pulse_rate = getopt['pulse_rate'] || default('pulse_rate') @debug = getopt['debug'] || default('debug') @psname = getopt['psname'] @at_exit = getopt['at_exit'] @dumped = getopt['dumped'] + @threadsafe = getopt['threadsafe'] || default('threadsafe') - raise ArgumentError, 'no slave object!' if + raise ArgumentError, 'no slave object or slave object block provided!' if @obj.nil? and block.nil? @shutdown = false @waiter = @status = nil + @lifeline = LifeLine.new - @heartbeat = Heartbeat::new @pulse_rate, @debug - @r, @w = IO::pipe - @r2, @w2 = IO::pipe - # weird syntax because dot/rdoc chokes on this!?!? init_failure = lambda do |e| + trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } o = Object.new class << o attr_accessor '__slave_object_failure__' end o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace] @@ -162,10 +352,11 @@ # unless((@pid = Slave::fork)) e = nil begin Kernel.at_exit{ Kernel.exit! } + @lifeline.catch if @obj @object = @obj else begin @@ -182,27 +373,29 @@ init_failure[e] end end $0 = (@psname ||= gen_psname(@object)) + unless @dumped or @object.respond_to?('__slave_object_failure__') @object.extend DRbUndumped end - @ppid, @pid = Process::ppid, Process::pid + if @threadsafe + @object = ThreadSafe.new @object + end - @r.close - @r2.close + @ppid, @pid = Process::ppid, Process::pid @socket = nil @uri = nil tmpdir, basename = Dir::tmpdir, File::basename(@psname) @socket_creation_attempts.times do |attempt| se = nil begin - s = File::join(tmpdir, "#{ basename }_#{ attempt }") + s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }") u = "drbunix://#{ s }" DRb::start_service u, @object @socket = s @uri = u trace{ "child - socket <#{ @socket }>" } @@ -212,24 +405,22 @@ nil end end if @socket and @uri - @heartbeat.start - trap('SIGUSR2') do - # @heartbeat.stop rescue nil DBb::thread.kill rescue nil FileUtils::rm_f @socket rescue nil exit end - @w.write @socket - @w.close - DRb::thread.join + @lifeline.puts @socket + @lifeline.cling else - @w.close + @lifeline.release + warn "slave(#{ $$ }) could not create socket!" + exit end rescue Exception => e trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } ensure status = e.respond_to?('status') ? e.status : 1 @@ -238,44 +429,36 @@ # # parent # else detach - @w.close - @w2.close - @socket = @r.read - @r.close + @lifeline.throw + buf = @lifeline.gets + raise "failed to find slave socket" if buf.nil? or buf.strip.empty? + @socket = buf.strip trace{ "parent - socket <#{ @socket }>" } if @at_exit - @at_exit_thread = Thread.new{ - Thread.current.abort_on_exception = true - - @r2.read rescue 42 - - if @at_exit.respond_to? 'call' - @at_exit.call self - else - send @at_exit.to_s, self - end + @at_exit_thread = @lifeline.on_cut{ + @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self) } end if @socket and File::exist? @socket Kernel.at_exit{ FileUtils::rm_f @socket } @uri = "drbunix://#{ socket }" trace{ "parent - uri <#{ @uri }>" } - @heartbeat.start # # starting drb on localhost avoids dns lookups! # DRb::start_service('druby://localhost:0', nil) unless DRb::thread @object = DRbObject::new nil, @uri if @object.respond_to? '__slave_object_failure__' c, m, bt = Marshal.load @object.__slave_object_failure__ (e = c.new(m)).set_backtrace bt + trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } raise e end @psname ||= gen_psname(@object) else raise "failed to find slave socket <#{ @socket }>" @@ -327,20 +510,20 @@ non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] #--}}} end alias :wait2 :wait # - # stops the heartbeat thread and kills the child process - give the key - # 'quiet' to ignore errors shutting down, including having already shutdown + # cuts the lifeline and kills the child process - give the key 'quiet' to + # ignore errors shutting down, including having already shutdown # def shutdown opts = {} #--{{{ quiet = getopts(opts)['quiet'] raise "already shutdown" if @shutdown unless quiet - failure = lambda{ raise $! unless quiet } - @heartbeat.stop rescue failure.call - Process::kill('SIGUSR2', @pid) rescue failure.call + begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end + begin; @lifeline.cut; rescue Exception; end + raise e if e unless quiet @shutdown = true #--}}} end # # true @@ -353,11 +536,11 @@ # # generate a default name to appear in ps/top # def gen_psname obj #--{{{ - "#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_') + "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_') #--}}} end # # see docs for Slave.default # @@ -377,157 +560,64 @@ # # debugging output - ENV['SLAVE_DEBUG']=1 to enable # def trace #--{{{ - STDERR.puts(yield) if @debug and STDERR.tty? + if @debug + STDERR.puts yield + STDERR.flush + end #--}}} end + # - # the Heartbeat class is essentially wrapper over an IPC channel that sends - # a ping on the channel indicating process health. if either end of the - # channel is detached the ping will fail and an error will be raised. in - # this way it is ensured that Slave objects cannot continue to live without - # their parent being alive. + # a simple convenience method which returns an *object* from another + # process. the object returned is the result of the supplied block. eg # - class Heartbeat + # object = Slave.object{ processor_intensive_object_built_in_child_process() } + # + # eg. + # + # the call can be made asynchronous via the 'async'/:async keyword + # + # thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() } + # + # # go on about your coding business then, later + # + # object = thread.value + # + def self.object opts = {}, &b #--{{{ - def initialize pulse_rate = 4.2, debug = false -#--{{{ - @pulse_rate = Float pulse_rate - @debug = debug - @r, @w = IO::pipe - @pid = Process::pid - @ppid = Process::ppid - @cid = nil - @thread = nil - @ppid = nil - @whoami = nil - @beating = nil - @pipe = nil + l = lambda{ begin; b.call; ensure; exit; end } + + async = opts.delete('async') || opts.delete(:async) + + opts['object'] = opts[:object] = l + opts['dumped'] = opts[:dumped] = true + + slave = Slave.new opts + + async ? Thread.new{ slave.object.call } : slave.object.call #--}}} - end - def start + end + def self.object opts = {}, &b #--{{{ - if Process::pid == @pid - @w.close - @pipe = @r - @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC - parent_start - else - @r.close - @pipe = @w - child_start + async = opts.delete('async') || opts.delete(:async) + + opts['object'] = opts[:object] = lambda(&b) + opts['dumped'] = opts[:dumped] = true + + slave = Slave.new opts + + value = lambda do |slave| + begin + slave.object.call + ensure + slave.shutdown end - @beating = true -#--}}} end - def parent_start -#--{{{ - @whoami = 'parent' - @thread = - Thread::new(Thread::current) do |cur| - begin - loop do - buf = @pipe.gets - trace{ "<#{ @whoami }> <#{ @pid }> gets <#{ buf.inspect }>" } - @cid = Integer buf.strip if @cid.nil? and buf =~ %r/^\s*\d+\s*$/ - end - rescue => e - cur.raise e - ensure - @pipe.close rescue nil - end - end -#--}}} - end - def child_start -#--{{{ - @whoami = 'child' - @pid = Process::pid - @ppid = Process::ppid - @thread = - Thread::new(Thread::current) do |cur| - begin - loop do - trace{ "<#{ @whoami }> <#{ @pid }> puts <#{ @pid }>" } - @pipe.puts @pid - Process::kill 0, @ppid - sleep @pulse_rate - end - rescue => e - cur.raise e - ensure - @pipe.close rescue nil - end - end -#--}}} - end - def start -#--{{{ - if Process::pid == @pid - @r.close - @pipe = @w - @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC - parent_start - else - @w.close - @pipe = @r - child_start - end - @beating = true + async ? Thread.new{ value[slave] } : value[slave] #--}}} - end - def parent_start -#--{{{ - @whoami = 'parent' - @thread = - Thread::new(Thread::current) do |cur| - begin - sleep - rescue => e - cur.raise e - ensure - @pipe.close rescue nil - end - end -#--}}} - end - def child_start -#--{{{ - @whoami = 'child' - @pid = Process::pid - @ppid = Process::ppid - @thread = - Thread::new(Thread::current) do |cur| - begin - trace{ "child reading..." } - @pipe.read - trace{ "child read." } - trace{ "child exiting." } - exit - rescue => e - cur.raise e - ensure - @pipe.close rescue nil - end - end -#--}}} - end - def stop -#--{{{ - raise "not beating" unless @beating - @thread.kill - @pipe.close rescue nil - @beating = false -#--}}} - end - def trace -#--{{{ - STDERR.puts(yield) if @debug and STDERR.tty? -#--}}} - end -#--}}} - end # class Heartbeat + end #--}}} end # class Slave