Class | Slave |
In: |
lib/slave.rb
|
Parent: | Object |
the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it‘s parent via a LifeLine which is designed such that the slave cannot out-live it‘s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg
class Server def add_two n n + 2 end end slave = Slave.new 'object' => Server.new server = slave.object p server.add_two(40) #=> 42
two other methods of providing server objects exist:
a) server = Server.new "this is called the parent" }
Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}
b) Slave.new{ Server.new "this is called only in the child" }
of the two ‘b’ is preferred.
VERSION | = | '1.2.1' | ||
DEFAULT_SOCKET_CREATION_ATTEMPTS | = | Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) | env config | |
DEFAULT_DEBUG | = | (ENV['SLAVE_DEBUG'] ? true : false) | ||
DEFAULT_THREADSAFE | = | (ENV['SLAVE_THREADSAFE'] ? true : false) |
at_exit | [R] | |
debug | [RW] | if this is true and you are running from a terminal information is printed on STDERR |
debug | [R] | |
dumped | [R] | |
obj | [R] | attrs |
object | [R] | |
pid | [R] | |
ppid | [R] | |
psname | [R] | |
socket | [R] | |
socket_creation_attempts | [RW] | |
socket_creation_attempts | [R] | |
status | [R] | |
threadsafe | [RW] | if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe yourself as this is required of any object acting as a drb server |
uri | [R] |
# File lib/slave.rb, line 83 83: def getopts opts 84: #--{{{ 85: raise ArgumentError, opts.class unless 86: opts.respond_to?('has_key?') and opts.respond_to?('[]') 87: 88: lambda do |key, *defval| 89: defval = defval.shift 90: keys = [key, key.to_s, key.to_s.intern] 91: key = keys.detect{|k| opts.has_key? k } and break opts[key] 92: defval 93: end 94: #--}}} 95: end
sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword
Slave.new :object => MyServer.new
or, preferably, using the block form
Slave.new{ MyServer.new }
when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child‘s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.
opts may contain the following keys, as either strings or symbols
object : specify the slave object. otherwise block value is used. socket_creation_attempts : specify how many attempts to create a unix domain socket will be made debug : turn on some logging to STDERR psname : specify the name that will appear in 'top' ($0) at_exit : specify a lambda to be called in the *parent* when the child dies dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
# File lib/slave.rb, line 319 319: def initialize opts = {}, &block 320: #--{{{ 321: getopt = getopts opts 322: 323: @obj = getopt['object'] 324: @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') 325: @debug = getopt['debug'] || default('debug') 326: @psname = getopt['psname'] 327: @at_exit = getopt['at_exit'] 328: @dumped = getopt['dumped'] 329: @threadsafe = getopt['threadsafe'] || default('threadsafe') 330: 331: raise ArgumentError, 'no slave object or slave object block provided!' if 332: @obj.nil? and block.nil? 333: 334: @shutdown = false 335: @waiter = @status = nil 336: @lifeline = LifeLine.new 337: 338: # weird syntax because dot/rdoc chokes on this!?!? 339: init_failure = lambda do |e| 340: trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 341: o = Object.new 342: class << o 343: attr_accessor '__slave_object_failure__' 344: end 345: o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace] 346: @object = o 347: end 348: 349: # 350: # child 351: # 352: unless((@pid = Slave::fork)) 353: e = nil 354: begin 355: Kernel.at_exit{ Kernel.exit! } 356: @lifeline.catch 357: 358: if @obj 359: @object = @obj 360: else 361: begin 362: @object = block.call 363: rescue Exception => e 364: init_failure[e] 365: end 366: end 367: 368: if block and @obj 369: begin 370: block[@obj] 371: rescue Exception => e 372: init_failure[e] 373: end 374: end 375: 376: $0 = (@psname ||= gen_psname(@object)) 377: 378: unless @dumped or @object.respond_to?('__slave_object_failure__') 379: @object.extend DRbUndumped 380: end 381: 382: if @threadsafe 383: @object = ThreadSafe.new @object 384: end 385: 386: @ppid, @pid = Process::ppid, Process::pid 387: @socket = nil 388: @uri = nil 389: 390: tmpdir, basename = Dir::tmpdir, File::basename(@psname) 391: 392: @socket_creation_attempts.times do |attempt| 393: se = nil 394: begin 395: s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }") 396: u = "drbunix://#{ s }" 397: DRb::start_service u, @object 398: @socket = s 399: @uri = u 400: trace{ "child - socket <#{ @socket }>" } 401: trace{ "child - uri <#{ @uri }>" } 402: break 403: rescue Errno::EADDRINUSE => se 404: nil 405: end 406: end 407: 408: if @socket and @uri 409: trap('SIGUSR2') do 410: DBb::thread.kill rescue nil 411: FileUtils::rm_f @socket rescue nil 412: exit 413: end 414: 415: @lifeline.puts @socket 416: @lifeline.cling 417: else 418: @lifeline.release 419: warn "slave(#{ $$ }) could not create socket!" 420: exit 421: end 422: rescue Exception => e 423: trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 424: ensure 425: status = e.respond_to?('status') ? e.status : 1 426: exit(status) 427: end 428: # 429: # parent 430: # 431: else 432: detach 433: @lifeline.throw 434: 435: buf = @lifeline.gets 436: raise "failed to find slave socket" if buf.nil? or buf.strip.empty? 437: @socket = buf.strip 438: trace{ "parent - socket <#{ @socket }>" } 439: 440: if @at_exit 441: @at_exit_thread = @lifeline.on_cut{ 442: @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self) 443: } 444: end 445: 446: if @socket and File::exist? @socket 447: Kernel.at_exit{ FileUtils::rm_f @socket } 448: @uri = "drbunix://#{ socket }" 449: trace{ "parent - uri <#{ @uri }>" } 450: # 451: # starting drb on localhost avoids dns lookups! 452: # 453: DRb::start_service('druby://localhost:0', nil) unless DRb::thread 454: @object = DRbObject::new nil, @uri 455: if @object.respond_to? '__slave_object_failure__' 456: c, m, bt = Marshal.load @object.__slave_object_failure__ 457: (e = c.new(m)).set_backtrace bt 458: trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 459: raise e 460: end 461: @psname ||= gen_psname(@object) 462: else 463: raise "failed to find slave socket <#{ @socket }>" 464: end 465: end 466: #--}}} 467: end
a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg
object = Slave.object{ processor_intensive_object_built_in_child_process() }
eg.
the call can be made asynchronous via the ‘async’/:async keyword
thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() } # go on about your coding business then, later object = thread.value
# File lib/slave.rb, line 587 587: def self.object opts = {}, &b 588: #--{{{ 589: async = opts.delete('async') || opts.delete(:async) 590: 591: opts['object'] = opts[:object] = lambda(&b) 592: opts['dumped'] = opts[:dumped] = true 593: 594: slave = Slave.new opts 595: 596: value = lambda do |slave| 597: begin 598: slave.object.call 599: ensure 600: slave.shutdown 601: end 602: end 603: 604: async ? Thread.new{ value[slave] } : value[slave] 605: #--}}} 606: end
see docs for Slave.default
# File lib/slave.rb, line 546 546: def default key 547: #--{{{ 548: self.class.default key 549: #--}}} 550: end
starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status
# File lib/slave.rb, line 473 473: def detach 474: #--{{{ 475: reap = lambda do |cid| 476: begin 477: @status = Process::waitpid2(cid).last 478: rescue Exception => e 479: m, c, b = e.message, e.class, e.backtrace.join("\n") 480: warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD 481: end 482: end 483: 484: Kernel.at_exit do 485: shutdown rescue nil 486: reap[@pid] rescue nil 487: end 488: 489: @waiter = 490: Thread.new do 491: begin 492: @status = Process::waitpid2(@pid).last 493: ensure 494: reap = lambda{|cid| 'no-op' } 495: end 496: end 497: #--}}} 498: end
see docs for Slave.getopts
# File lib/slave.rb, line 554 554: def getopts opts 555: #--{{{ 556: self.class.getopts opts 557: #--}}} 558: end
cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown
# File lib/slave.rb, line 517 517: def shutdown opts = {} 518: #--{{{ 519: quiet = getopts(opts)['quiet'] 520: raise "already shutdown" if @shutdown unless quiet 521: begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end 522: begin; @lifeline.cut; rescue Exception; end 523: raise e if e unless quiet 524: @shutdown = true 525: #--}}} 526: end
true
# File lib/slave.rb, line 530 530: def shutdown? 531: #--{{{ 532: @shutdown 533: #--}}} 534: end
debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable
# File lib/slave.rb, line 562 562: def trace 563: #--{{{ 564: if @debug 565: STDERR.puts yield 566: STDERR.flush 567: end 568: #--}}} 569: end
wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg
thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
# File lib/slave.rb, line 505 505: def wait opts = {}, &b 506: #--{{{ 507: b ||= lambda{|exit_status|} 508: non_block = getopts(opts)['non_block'] 509: non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] 510: #--}}} 511: end