lib/slave.rb in slave-1.0.0 vs lib/slave.rb in slave-1.1.0
- old
+ new
@@ -1,20 +1,24 @@
require 'drb/drb'
require 'fileutils'
require 'tmpdir'
require 'tempfile'
require 'fcntl'
+require 'socket'
+require 'sync'
+# TODO - lifeline need close-on-exec set in it!
+
#
# the Slave class encapsulates the work of setting up a drb server in another
-# process running on localhost. the slave process is attached to it's parent
-# via a Heartbeat which is designed such that the slave cannot out-live it's
-# parent and become a zombie, even if the parent dies and early death, such as
-# by 'kill -9'. the concept and purpose of the Slave class is to be able to
-# setup any server object in another process so easily that using a
-# multi-process, drb/ipc, based design is as easy, or easier, than a
-# multi-threaded one. eg
+# process running on localhost via unix domain sockets. the slave process is
+# attached to it's parent via a LifeLine which is designed such that the slave
+# cannot out-live it's parent and become a zombie, even if the parent dies and
+# early death, such as by 'kill -9'. the concept and purpose of the Slave
+# class is to be able to setup any server object in another process so easily
+# that using a multi-process, drb/ipc, based design is as easy, or easier,
+# than a multi-threaded one. eg
#
# class Server
# def add_two n
# n + 2
# end
@@ -34,43 +38,43 @@
#
# of the two 'b' is preferred.
#
class Slave
#--{{{
- VERSION = '1.0.0'
+ VERSION = '1.1.0'
def self.version() VERSION end
#
- # config
+ # env config
#
- DEFAULT_SOCKET_CREATION_ATTEMPTS =
- Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)
-
- DEFAULT_PULSE_RATE =
- Float(ENV['SLAVE_PULSE_RATE'] || 8)
-
- DEFAULT_DEBUG =
- (ENV['SLAVE_DEBUG'] ? true : false)
-
+ DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)
+ DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false)
+ DEFAULT_THREADSAFE = (ENV['SLAVE_THREADSAFE'] ? true : false)
+ #
+ # class initialization
+ #
@socket_creation_attempts = DEFAULT_SOCKET_CREATION_ATTEMPTS
- @pulse_rate = DEFAULT_PULSE_RATE
@debug = DEFAULT_DEBUG
+ @threadsafe = DEFAULT_THREADSAFE
#
# class methods
#
class << self
#--{{{
# defineds how many attempts will be made to create a temporary unix domain
# socket
attr :socket_creation_attempts, true
- # defined the rate of pinging in the Heartbeat object
- attr :pulse_rate, true
-
# if this is true and you are running from a terminal information is printed
# on STDERR
attr :debug, true
+ # if this is true all slave objects will be wrapped such that any call
+ # to the object is threadsafe. if you do not use this you must ensure
+ # that your objects are threadsafe __yourself__ as this is required of
+ # any object acting as a drb server
+ attr :threadsafe, true
+
# get a default value
def default key
#--{{{
send key
#--}}}
@@ -103,54 +107,240 @@
#--}}}
end
#--}}}
end
+ #
+ # helper classes
+ #
+ #
+ # ThreadSafe is a delegate wrapper class used for implementing gross thread
+ # safety around existing objects. when an object is wrapped with this class
+ # as
+ #
+ # ts = ThreadSafe.new{ AnyObject.new }
+ #
+ # then ts can be used exactly as the normal object would have been, only all
+ # calls are now thread safe. this is the mechanism behind the
+ # 'threadsafe'/:threadsafe keyword to Slave#initialize
+ #
+ class ThreadSafe
+#--{{{
+ instance_methods.each{|m| undef_method unless m[%r/__/]}
+ def initialize object
+ @object = object
+ @sync = Sync.new
+ end
+ def ex
+ @sync.synchronize{ yield }
+ end
+ def method_missing m, *a, &b
+ ex{ @object.send m, *a, &b }
+ end
+ def respond_to? m
+ ex{ @object.respond_to? m }
+ end
+ def inspect
+ ex{ @object.inspect }
+ end
+ def class
+ ex{ @object.class }
+ end
+#--}}}
+ end
+ #
+ # a simple thread safe hash used to map object_id to a set of file
+ # descriptors in the LifeLine class. see LifeLine::FDS
+ #
+ class ThreadSafeHash < Hash
+ def self.new(*a, &b) ThreadSafe.new(super) end
+ end
+ #
+ # the LifeLine class is used to communitacte between child and parent
+ # processes and to prevent child processes from ever becoming zombies or
+ # otherwise abandoned by their parents. the basic concept is that a socket
+ # pair is setup between child and parent. the child process, because it is
+ # a Slave, sets up a handler such that, should it's socket ever grow stale,
+ # will exit the process. this class replaces the HeartBeat class from
+ # previous Slave versions.
+ #
+ class LifeLine
+#--{{{
+ FDS = ThreadSafeHash.new
+
+ def initialize
+ @pair = Socket.pair Socket::AF_UNIX, Socket::SOCK_STREAM, 0
+ @owner = Process.pid
+ @pid = nil
+ @socket = nil
+ @object_id = object_id
+
+ @fds = @pair.map{|s| s.fileno}
+ oid, fds = @object_id, @fds
+ FDS[oid] = fds
+ ObjectSpace.define_finalizer(self){ FDS.delete oid }
+ end
+
+ def owner?
+ Process.pid == @owner
+ end
+
+ def throw *ignored
+ raise unless owner?
+ @pair[-1].close
+ @pair[-1] = nil
+ @pid = Process.pid
+ @socket = @pair[0]
+ @socket.sync = true
+ end
+
+ def catch *ignored
+ raise if owner?
+ @pair[0].close
+ @pair[0] = nil
+ @pid = Process.pid
+ @socket = @pair[-1]
+ @socket.sync = true
+ close_unused_sockets_after_forking
+ end
+
+ def close_unused_sockets_after_forking
+ begin
+ to_delete = []
+ begin
+ FDS.each do |oid, fds|
+ next if oid == @object_id
+ begin
+ IO.for_fd(fds.first).close
+ rescue Exception => e
+ STDERR.puts "#{ e.message } (#{ e.class })\n#{ e.backtrace.join 10.chr }"
+ ensure
+ to_delete << oid
+ end
+ end
+ ensure
+ FDS.ex{ to_delete.each{|oid| FDS.delete oid rescue 42} }
+ end
+ GC.start
+ rescue Exception => e
+ 42
+ end
+ end
+
+ def cut
+ raise unless owner?
+ raise unless @socket
+ @socket.close rescue nil
+ FDS.delete object_id
+ end
+ alias_method "release", "cut"
+
+ DELEGATED = %w( puts gets read write close flush each )
+
+ DELEGATED.each do |m|
+ code = <<-code
+ def #{ m }(*a, &b)
+ raise unless @socket
+ @socket.#{ m } *a, &b
+ end
+ code
+ module_eval code, __FILE__, __LINE__
+ end
+
+ def on_cut &b
+ at_exit{ begin; b.call; ensure; b = nil; end if b}
+ Thread.new(Thread.current){|current|
+ Thread.current.abort_on_exception = true
+ begin
+ each{|*a|}
+ rescue Exception
+ current.raise $!
+ 42
+ ensure
+ begin; b.call; ensure; b = nil; end if b
+ end
+ }
+ end
+
+ def cling &b
+ on_cut{ begin; b.call if b; ensure; Kernel.exit; end }.join
+ end
+#--}}}
+ end
+
+ #
+ # attrs
+ #
attr :obj
attr :socket_creation_attempts
- attr :pulse_rate
attr :debug
attr :psname
attr :at_exit
+ attr :dumped
attr :shutdown
attr :status
attr :object
attr :pid
attr :ppid
attr :uri
attr :socket
-
#
- # opts may contain the keys 'object', 'socket_creation_attempts',
- # 'pulse_rate', 'psname', 'dumped', or 'debug'
+ # sets up a child process serving any object as a DRb server running locally
+ # on unix domain sockets. the child process has a LifeLine established
+ # between it and the parent, making it impossible for the child to outlive
+ # the parent (become a zombie). the object to serve is specfied either
+ # directly using the 'object'/:object keyword
#
+ # Slave.new :object => MyServer.new
+ #
+ # or, preferably, using the block form
+ #
+ # Slave.new{ MyServer.new }
+ #
+ # when the block form is used the object is contructed in the child process
+ # itself. this is quite advantageous if the child object consumes resources
+ # or opens file handles (db connections, etc). by contructing the object in
+ # the child any resources are consumed from the child's address space and
+ # things like open file handles will not be carried into subsequent child
+ # processes (via standard unix fork semantics). in the event that a block
+ # is specified but the object cannot be constructed and, instead, throws and
+ # Exception, that exception will be propogated to the parent process.
+ #
+ # opts may contain the following keys, as either strings or symbols
+ #
+ # object : specify the slave object. otherwise block value is used.
+ # socket_creation_attempts : specify how many attempts to create a unix domain socket will be made
+ # debug : turn on some logging to STDERR
+ # psname : specify the name that will appear in 'top' ($0)
+ # at_exit : specify a lambda to be called in the *parent* when the child dies
+ # dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped)
+ # threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
+ #
def initialize opts = {}, &block
#--{{{
getopt = getopts opts
@obj = getopt['object']
@socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
- @pulse_rate = getopt['pulse_rate'] || default('pulse_rate')
@debug = getopt['debug'] || default('debug')
@psname = getopt['psname']
@at_exit = getopt['at_exit']
@dumped = getopt['dumped']
+ @threadsafe = getopt['threadsafe'] || default('threadsafe')
- raise ArgumentError, 'no slave object!' if
+ raise ArgumentError, 'no slave object or slave object block provided!' if
@obj.nil? and block.nil?
@shutdown = false
@waiter = @status = nil
+ @lifeline = LifeLine.new
- @heartbeat = Heartbeat::new @pulse_rate, @debug
- @r, @w = IO::pipe
- @r2, @w2 = IO::pipe
-
# weird syntax because dot/rdoc chokes on this!?!?
init_failure = lambda do |e|
+ trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
o = Object.new
class << o
attr_accessor '__slave_object_failure__'
end
o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
@@ -162,10 +352,11 @@
#
unless((@pid = Slave::fork))
e = nil
begin
Kernel.at_exit{ Kernel.exit! }
+ @lifeline.catch
if @obj
@object = @obj
else
begin
@@ -182,27 +373,29 @@
init_failure[e]
end
end
$0 = (@psname ||= gen_psname(@object))
+
unless @dumped or @object.respond_to?('__slave_object_failure__')
@object.extend DRbUndumped
end
- @ppid, @pid = Process::ppid, Process::pid
+ if @threadsafe
+ @object = ThreadSafe.new @object
+ end
- @r.close
- @r2.close
+ @ppid, @pid = Process::ppid, Process::pid
@socket = nil
@uri = nil
tmpdir, basename = Dir::tmpdir, File::basename(@psname)
@socket_creation_attempts.times do |attempt|
se = nil
begin
- s = File::join(tmpdir, "#{ basename }_#{ attempt }")
+ s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }")
u = "drbunix://#{ s }"
DRb::start_service u, @object
@socket = s
@uri = u
trace{ "child - socket <#{ @socket }>" }
@@ -212,24 +405,22 @@
nil
end
end
if @socket and @uri
- @heartbeat.start
-
trap('SIGUSR2') do
- # @heartbeat.stop rescue nil
DBb::thread.kill rescue nil
FileUtils::rm_f @socket rescue nil
exit
end
- @w.write @socket
- @w.close
- DRb::thread.join
+ @lifeline.puts @socket
+ @lifeline.cling
else
- @w.close
+ @lifeline.release
+ warn "slave(#{ $$ }) could not create socket!"
+ exit
end
rescue Exception => e
trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
ensure
status = e.respond_to?('status') ? e.status : 1
@@ -238,44 +429,36 @@
#
# parent
#
else
detach
- @w.close
- @w2.close
- @socket = @r.read
- @r.close
+ @lifeline.throw
+ buf = @lifeline.gets
+ raise "failed to find slave socket" if buf.nil? or buf.strip.empty?
+ @socket = buf.strip
trace{ "parent - socket <#{ @socket }>" }
if @at_exit
- @at_exit_thread = Thread.new{
- Thread.current.abort_on_exception = true
-
- @r2.read rescue 42
-
- if @at_exit.respond_to? 'call'
- @at_exit.call self
- else
- send @at_exit.to_s, self
- end
+ @at_exit_thread = @lifeline.on_cut{
+ @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self)
}
end
if @socket and File::exist? @socket
Kernel.at_exit{ FileUtils::rm_f @socket }
@uri = "drbunix://#{ socket }"
trace{ "parent - uri <#{ @uri }>" }
- @heartbeat.start
#
# starting drb on localhost avoids dns lookups!
#
DRb::start_service('druby://localhost:0', nil) unless DRb::thread
@object = DRbObject::new nil, @uri
if @object.respond_to? '__slave_object_failure__'
c, m, bt = Marshal.load @object.__slave_object_failure__
(e = c.new(m)).set_backtrace bt
+ trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
raise e
end
@psname ||= gen_psname(@object)
else
raise "failed to find slave socket <#{ @socket }>"
@@ -327,20 +510,20 @@
non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
#--}}}
end
alias :wait2 :wait
#
- # stops the heartbeat thread and kills the child process - give the key
- # 'quiet' to ignore errors shutting down, including having already shutdown
+ # cuts the lifeline and kills the child process - give the key 'quiet' to
+ # ignore errors shutting down, including having already shutdown
#
def shutdown opts = {}
#--{{{
quiet = getopts(opts)['quiet']
raise "already shutdown" if @shutdown unless quiet
- failure = lambda{ raise $! unless quiet }
- @heartbeat.stop rescue failure.call
- Process::kill('SIGUSR2', @pid) rescue failure.call
+ begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end
+ begin; @lifeline.cut; rescue Exception; end
+ raise e if e unless quiet
@shutdown = true
#--}}}
end
#
# true
@@ -353,11 +536,11 @@
#
# generate a default name to appear in ps/top
#
def gen_psname obj
#--{{{
- "#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_')
+ "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_')
#--}}}
end
#
# see docs for Slave.default
#
@@ -377,157 +560,64 @@
#
# debugging output - ENV['SLAVE_DEBUG']=1 to enable
#
def trace
#--{{{
- STDERR.puts(yield) if @debug and STDERR.tty?
+ if @debug
+ STDERR.puts yield
+ STDERR.flush
+ end
#--}}}
end
+
#
- # the Heartbeat class is essentially wrapper over an IPC channel that sends
- # a ping on the channel indicating process health. if either end of the
- # channel is detached the ping will fail and an error will be raised. in
- # this way it is ensured that Slave objects cannot continue to live without
- # their parent being alive.
+ # a simple convenience method which returns an *object* from another
+ # process. the object returned is the result of the supplied block. eg
#
- class Heartbeat
+ # object = Slave.object{ processor_intensive_object_built_in_child_process() }
+ #
+ # eg.
+ #
+ # the call can be made asynchronous via the 'async'/:async keyword
+ #
+ # thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }
+ #
+ # # go on about your coding business then, later
+ #
+ # object = thread.value
+ #
+ def self.object opts = {}, &b
#--{{{
- def initialize pulse_rate = 4.2, debug = false
-#--{{{
- @pulse_rate = Float pulse_rate
- @debug = debug
- @r, @w = IO::pipe
- @pid = Process::pid
- @ppid = Process::ppid
- @cid = nil
- @thread = nil
- @ppid = nil
- @whoami = nil
- @beating = nil
- @pipe = nil
+ l = lambda{ begin; b.call; ensure; exit; end }
+
+ async = opts.delete('async') || opts.delete(:async)
+
+ opts['object'] = opts[:object] = l
+ opts['dumped'] = opts[:dumped] = true
+
+ slave = Slave.new opts
+
+ async ? Thread.new{ slave.object.call } : slave.object.call
#--}}}
- end
- def start
+ end
+ def self.object opts = {}, &b
#--{{{
- if Process::pid == @pid
- @w.close
- @pipe = @r
- @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC
- parent_start
- else
- @r.close
- @pipe = @w
- child_start
+ async = opts.delete('async') || opts.delete(:async)
+
+ opts['object'] = opts[:object] = lambda(&b)
+ opts['dumped'] = opts[:dumped] = true
+
+ slave = Slave.new opts
+
+ value = lambda do |slave|
+ begin
+ slave.object.call
+ ensure
+ slave.shutdown
end
- @beating = true
-#--}}}
end
- def parent_start
-#--{{{
- @whoami = 'parent'
- @thread =
- Thread::new(Thread::current) do |cur|
- begin
- loop do
- buf = @pipe.gets
- trace{ "<#{ @whoami }> <#{ @pid }> gets <#{ buf.inspect }>" }
- @cid = Integer buf.strip if @cid.nil? and buf =~ %r/^\s*\d+\s*$/
- end
- rescue => e
- cur.raise e
- ensure
- @pipe.close rescue nil
- end
- end
-#--}}}
- end
- def child_start
-#--{{{
- @whoami = 'child'
- @pid = Process::pid
- @ppid = Process::ppid
- @thread =
- Thread::new(Thread::current) do |cur|
- begin
- loop do
- trace{ "<#{ @whoami }> <#{ @pid }> puts <#{ @pid }>" }
- @pipe.puts @pid
- Process::kill 0, @ppid
- sleep @pulse_rate
- end
- rescue => e
- cur.raise e
- ensure
- @pipe.close rescue nil
- end
- end
-#--}}}
- end
- def start
-#--{{{
- if Process::pid == @pid
- @r.close
- @pipe = @w
- @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC
- parent_start
- else
- @w.close
- @pipe = @r
- child_start
- end
- @beating = true
+ async ? Thread.new{ value[slave] } : value[slave]
#--}}}
- end
- def parent_start
-#--{{{
- @whoami = 'parent'
- @thread =
- Thread::new(Thread::current) do |cur|
- begin
- sleep
- rescue => e
- cur.raise e
- ensure
- @pipe.close rescue nil
- end
- end
-#--}}}
- end
- def child_start
-#--{{{
- @whoami = 'child'
- @pid = Process::pid
- @ppid = Process::ppid
- @thread =
- Thread::new(Thread::current) do |cur|
- begin
- trace{ "child reading..." }
- @pipe.read
- trace{ "child read." }
- trace{ "child exiting." }
- exit
- rescue => e
- cur.raise e
- ensure
- @pipe.close rescue nil
- end
- end
-#--}}}
- end
- def stop
-#--{{{
- raise "not beating" unless @beating
- @thread.kill
- @pipe.close rescue nil
- @beating = false
-#--}}}
- end
- def trace
-#--{{{
- STDERR.puts(yield) if @debug and STDERR.tty?
-#--}}}
- end
-#--}}}
- end # class Heartbeat
+ end
#--}}}
end # class Slave