In: |
lib/slave.rb
|
Parent: | Object |
the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it’s parent via a LifeLine which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg
class Server def add_two n n + 2 end end slave = Slave.new 'object' => Server.new server = slave.object p server.add_two(40) #=> 42
two other methods of providing server objects exist:
a) server = Server.new "this is called the parent" }
Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}
b) Slave.new{ Server.new "this is called only in the child" }
of the two ‘b’ is preferred.
VERSION | = | '1.1.0' |
DEFAULT_SOCKET_CREATION_ATTEMPTS | = | Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) |
env config | ||
DEFAULT_DEBUG | = | (ENV['SLAVE_DEBUG'] ? true : false) |
DEFAULT_THREADSAFE | = | (ENV['SLAVE_THREADSAFE'] ? true : false) |
at_exit | [R] | |
debug | [RW] | if this is true and you are running from a terminal information is printed on STDERR |
debug | [R] | |
dumped | [R] | |
obj | [R] | attrs |
object | [R] | |
pid | [R] | |
ppid | [R] | |
psname | [R] | |
shutdown | [R] | |
socket | [R] | |
socket_creation_attempts | [RW] | |
socket_creation_attempts | [R] | |
status | [R] | |
threadsafe | [RW] | if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe yourself as this is required of any object acting as a drb server |
uri | [R] |
get a default value
# File lib/slave.rb, line 77 77: def default key 78: #--{{{ 79: send key 80: #--}}} 81: end
just fork with out silly warnings
# File lib/slave.rb, line 98 98: def fork &block 99: #--{{{ 100: v = $VERBOSE 101: begin 102: $VERBOSE = nil 103: Process::fork &block 104: ensure 105: $VERBOSE = v 106: end 107: #--}}} 108: end
# File lib/slave.rb, line 83 83: def getopts opts 84: #--{{{ 85: raise ArgumentError, opts.class unless 86: opts.respond_to?('has_key?') and opts.respond_to?('[]') 87: 88: lambda do |key, *defval| 89: defval = defval.shift 90: keys = [key, key.to_s, key.to_s.intern] 91: key = keys.detect{|k| opts.has_key? k } and break opts[key] 92: defval 93: end 94: #--}}} 95: end
sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword
Slave.new :object => MyServer.new
or, preferably, using the block form
Slave.new{ MyServer.new }
when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child’s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.
opts may contain the following keys, as either strings or symbols
object : specify the slave object. otherwise block value is used. socket_creation_attempts : specify how many attempts to create a unix domain socket will be made debug : turn on some logging to STDERR psname : specify the name that will appear in 'top' ($0) at_exit : specify a lambda to be called in the *parent* when the child dies dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
# File lib/slave.rb, line 321 321: def initialize opts = {}, &block 322: #--{{{ 323: getopt = getopts opts 324: 325: @obj = getopt['object'] 326: @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts') 327: @debug = getopt['debug'] || default('debug') 328: @psname = getopt['psname'] 329: @at_exit = getopt['at_exit'] 330: @dumped = getopt['dumped'] 331: @threadsafe = getopt['threadsafe'] || default('threadsafe') 332: 333: raise ArgumentError, 'no slave object or slave object block provided!' if 334: @obj.nil? and block.nil? 335: 336: @shutdown = false 337: @waiter = @status = nil 338: @lifeline = LifeLine.new 339: 340: # weird syntax because dot/rdoc chokes on this!?!? 341: init_failure = lambda do |e| 342: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 343: o = Object.new 344: class << o 345: attr_accessor '__slave_object_failure__' 346: end 347: o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace] 348: @object = o 349: end 350: 351: # 352: # child 353: # 354: unless((@pid = Slave::fork)) 355: e = nil 356: begin 357: Kernel.at_exit{ Kernel.exit! } 358: @lifeline.catch 359: 360: if @obj 361: @object = @obj 362: else 363: begin 364: @object = block.call 365: rescue Exception => e 366: init_failure[e] 367: end 368: end 369: 370: if block and @obj 371: begin 372: block[@obj] 373: rescue Exception => e 374: init_failure[e] 375: end 376: end 377: 378: $0 = (@psname ||= gen_psname(@object)) 379: 380: unless @dumped or @object.respond_to?('__slave_object_failure__') 381: @object.extend DRbUndumped 382: end 383: 384: if @threadsafe 385: @object = ThreadSafe.new @object 386: end 387: 388: @ppid, @pid = Process::ppid, Process::pid 389: @socket = nil 390: @uri = nil 391: 392: tmpdir, basename = Dir::tmpdir, File::basename(@psname) 393: 394: @socket_creation_attempts.times do |attempt| 395: se = nil 396: begin 397: s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }") 398: u = "drbunix://#{ s }" 399: DRb::start_service u, @object 400: @socket = s 401: @uri = u 402: trace{ "child - socket <#{ @socket }>" } 403: trace{ "child - uri <#{ @uri }>" } 404: break 405: rescue Errno::EADDRINUSE => se 406: nil 407: end 408: end 409: 410: if @socket and @uri 411: trap('SIGUSR2') do 412: DBb::thread.kill rescue nil 413: FileUtils::rm_f @socket rescue nil 414: exit 415: end 416: 417: @lifeline.puts @socket 418: @lifeline.cling 419: else 420: @lifeline.release 421: warn "slave(#{ $$ }) could not create socket!" 422: exit 423: end 424: rescue Exception => e 425: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 426: ensure 427: status = e.respond_to?('status') ? e.status : 1 428: exit(status) 429: end 430: # 431: # parent 432: # 433: else 434: detach 435: @lifeline.throw 436: 437: buf = @lifeline.gets 438: raise "failed to find slave socket" if buf.nil? or buf.strip.empty? 439: @socket = buf.strip 440: trace{ "parent - socket <#{ @socket }>" } 441: 442: if @at_exit 443: @at_exit_thread = @lifeline.on_cut{ 444: @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self) 445: } 446: end 447: 448: if @socket and File::exist? @socket 449: Kernel.at_exit{ FileUtils::rm_f @socket } 450: @uri = "drbunix://#{ socket }" 451: trace{ "parent - uri <#{ @uri }>" } 452: # 453: # starting drb on localhost avoids dns lookups! 454: # 455: DRb::start_service('druby://localhost:0', nil) unless DRb::thread 456: @object = DRbObject::new nil, @uri 457: if @object.respond_to? '__slave_object_failure__' 458: c, m, bt = Marshal.load @object.__slave_object_failure__ 459: (e = c.new(m)).set_backtrace bt 460: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] } 461: raise e 462: end 463: @psname ||= gen_psname(@object) 464: else 465: raise "failed to find slave socket <#{ @socket }>" 466: end 467: end 468: #--}}} 469: end
a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg
object = Slave.object{ processor_intensive_object_built_in_child_process() }
eg.
the call can be made asynchronous via the ‘async’/:async keyword
thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() } # go on about your coding business then, later object = thread.value
# File lib/slave.rb, line 589 589: def self.object opts = {}, &b 590: #--{{{ 591: l = lambda{ begin; b.call; ensure; exit; end } 592: 593: async = opts.delete('async') || opts.delete(:async) 594: 595: opts['object'] = opts[:object] = l 596: opts['dumped'] = opts[:dumped] = true 597: 598: slave = Slave.new opts 599: 600: async ? Thread.new{ slave.object.call } : slave.object.call 601: #--}}} 602: end
# File lib/slave.rb, line 603 603: def self.object opts = {}, &b 604: #--{{{ 605: async = opts.delete('async') || opts.delete(:async) 606: 607: opts['object'] = opts[:object] = lambda(&b) 608: opts['dumped'] = opts[:dumped] = true 609: 610: slave = Slave.new opts 611: 612: value = lambda do |slave| 613: begin 614: slave.object.call 615: ensure 616: slave.shutdown 617: end 618: end 619: 620: async ? Thread.new{ value[slave] } : value[slave] 621: #--}}} 622: end
see docs for Slave.default
# File lib/slave.rb, line 548 548: def default key 549: #--{{{ 550: self.class.default key 551: #--}}} 552: end
starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status
# File lib/slave.rb, line 475 475: def detach 476: #--{{{ 477: reap = lambda do |cid| 478: begin 479: @status = Process::waitpid2(cid).last 480: rescue Exception => e 481: m, c, b = e.message, e.class, e.backtrace.join("\n") 482: warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD 483: end 484: end 485: 486: Kernel.at_exit do 487: shutdown rescue nil 488: reap[@pid] rescue nil 489: end 490: 491: @waiter = 492: Thread.new do 493: begin 494: @status = Process::waitpid2(@pid).last 495: ensure 496: reap = lambda{|cid| 'no-op' } 497: end 498: end 499: #--}}} 500: end
generate a default name to appear in ps/top
# File lib/slave.rb, line 540 540: def gen_psname obj 541: #--{{{ 542: "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%/\s+/,'_') 543: #--}}} 544: end
see docs for Slave.getopts
# File lib/slave.rb, line 556 556: def getopts opts 557: #--{{{ 558: self.class.getopts opts 559: #--}}} 560: end
cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown
# File lib/slave.rb, line 519 519: def shutdown opts = {} 520: #--{{{ 521: quiet = getopts(opts)['quiet'] 522: raise "already shutdown" if @shutdown unless quiet 523: begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end 524: begin; @lifeline.cut; rescue Exception; end 525: raise e if e unless quiet 526: @shutdown = true 527: #--}}} 528: end
true
# File lib/slave.rb, line 532 532: def shutdown? 533: #--{{{ 534: @shutdown 535: #--}}} 536: end
debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable
# File lib/slave.rb, line 564 564: def trace 565: #--{{{ 566: if @debug 567: STDERR.puts yield 568: STDERR.flush 569: end 570: #--}}} 571: end
wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg
thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
# File lib/slave.rb, line 507 507: def wait opts = {}, &b 508: #--{{{ 509: b ||= lambda{|exit_status|} 510: non_block = getopts(opts)['non_block'] 511: non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ] 512: #--}}} 513: end