Class Slave
In: lib/slave.rb
Parent: Object
Slave dot/f_1.jpg

the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it‘s parent via a LifeLine which is designed such that the slave cannot out-live it‘s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg

  class Server
    def add_two n
      n + 2
    end
  end

  slave = Slave.new 'object' => Server.new
  server = slave.object

  p server.add_two(40) #=> 42

two other methods of providing server objects exist:

a) server = Server.new "this is called the parent" }

   Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}

b) Slave.new{ Server.new "this is called only in the child" }

of the two ‘b’ is preferred.

Methods

default   default   detach   fork   gen_psname   getopts   getopts   new   object   shutdown   shutdown?   trace   version   wait   wait2  

Classes and Modules

Class Slave::LifeLine
Class Slave::ThreadSafe
Class Slave::ThreadSafeHash

Constants

VERSION = '1.2.1'
DEFAULT_SOCKET_CREATION_ATTEMPTS = Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)   env config
DEFAULT_DEBUG = (ENV['SLAVE_DEBUG'] ? true : false)
DEFAULT_THREADSAFE = (ENV['SLAVE_THREADSAFE'] ? true : false)

Attributes

at_exit  [R] 
debug  [RW]  if this is true and you are running from a terminal information is printed on STDERR
debug  [R] 
dumped  [R] 
obj  [R]  attrs
object  [R] 
pid  [R] 
ppid  [R] 
psname  [R] 
socket  [R] 
socket_creation_attempts  [RW] 
socket_creation_attempts  [R] 
status  [R] 
threadsafe  [RW]  if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe yourself as this is required of any object acting as a drb server
uri  [R] 

Public Class methods

get a default value

[Source]

    # File lib/slave.rb, line 77
77:       def default key
78: #--{{{
79:         send key
80: #--}}}
81:       end

just fork with out silly warnings

[Source]

     # File lib/slave.rb, line 98
 98:       def fork &b
 99: #--{{{
100:         v = $VERBOSE
101:         begin
102:           $VERBOSE = nil
103:           Process::fork(&b)
104:         ensure
105:         $VERBOSE = v
106:         end
107: #--}}}
108:       end

[Source]

    # File lib/slave.rb, line 83
83:       def getopts opts
84: #--{{{
85:         raise ArgumentError, opts.class unless
86:           opts.respond_to?('has_key?') and opts.respond_to?('[]')
87: 
88:         lambda do |key, *defval|
89:           defval = defval.shift
90:           keys = [key, key.to_s, key.to_s.intern]
91:           key = keys.detect{|k| opts.has_key? k } and break opts[key]
92:           defval
93:         end
94: #--}}}
95:       end

sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword

  Slave.new :object => MyServer.new

or, preferably, using the block form

  Slave.new{ MyServer.new }

when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child‘s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.

opts may contain the following keys, as either strings or symbols

  object : specify the slave object.  otherwise block value is used.
  socket_creation_attempts : specify how many attempts to create a unix domain socket will be made
  debug : turn on some logging to STDERR
  psname : specify the name that will appear in 'top' ($0)
  at_exit : specify a lambda to be called in the *parent* when the child dies
  dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped)
  threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety

[Source]

     # File lib/slave.rb, line 319
319:     def initialize opts = {}, &block
320: #--{{{
321:       getopt = getopts opts
322: 
323:       @obj = getopt['object']
324:       @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
325:       @debug = getopt['debug'] || default('debug')
326:       @psname = getopt['psname']
327:       @at_exit = getopt['at_exit']
328:       @dumped = getopt['dumped']
329:       @threadsafe = getopt['threadsafe'] || default('threadsafe')
330: 
331:       raise ArgumentError, 'no slave object or slave object block provided!' if 
332:         @obj.nil? and block.nil?
333: 
334:       @shutdown = false
335:       @waiter = @status = nil
336:       @lifeline = LifeLine.new
337: 
338:       # weird syntax because dot/rdoc chokes on this!?!?
339:       init_failure = lambda do |e|
340:         trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
341:         o = Object.new
342:         class << o
343:           attr_accessor '__slave_object_failure__'
344:         end
345:         o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
346:         @object = o
347:       end
348: 
349:     #
350:     # child
351:     #
352:       unless((@pid = Slave::fork))
353:         e = nil
354:         begin
355:           Kernel.at_exit{ Kernel.exit! }
356:           @lifeline.catch
357: 
358:           if @obj
359:             @object = @obj
360:           else
361:             begin
362:               @object = block.call 
363:             rescue Exception => e
364:               init_failure[e]
365:             end
366:           end
367: 
368:           if block and @obj
369:             begin
370:               block[@obj]
371:             rescue Exception => e
372:               init_failure[e]
373:             end
374:           end
375: 
376:           $0 = (@psname ||= gen_psname(@object))
377: 
378:           unless @dumped or @object.respond_to?('__slave_object_failure__')
379:             @object.extend DRbUndumped
380:           end
381: 
382:           if @threadsafe
383:             @object = ThreadSafe.new @object
384:           end
385: 
386:           @ppid, @pid = Process::ppid, Process::pid
387:           @socket = nil
388:           @uri = nil
389: 
390:           tmpdir, basename = Dir::tmpdir, File::basename(@psname)
391: 
392:           @socket_creation_attempts.times do |attempt|
393:             se = nil
394:             begin
395:               s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }")
396:               u = "drbunix://#{ s }"
397:               DRb::start_service u, @object 
398:               @socket = s
399:               @uri = u
400:               trace{ "child - socket <#{ @socket }>" }
401:               trace{ "child - uri <#{ @uri }>" }
402:               break
403:             rescue Errno::EADDRINUSE => se
404:               nil
405:             end
406:           end
407: 
408:           if @socket and @uri
409:             trap('SIGUSR2') do
410:               DBb::thread.kill rescue nil
411:               FileUtils::rm_f @socket rescue nil
412:               exit
413:             end
414: 
415:             @lifeline.puts @socket 
416:             @lifeline.cling
417:           else
418:             @lifeline.release
419:             warn "slave(#{ $$ }) could not create socket!"
420:             exit
421:           end
422:         rescue Exception => e
423:           trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
424:         ensure
425:           status = e.respond_to?('status') ? e.status : 1
426:           exit(status)
427:         end
428:     #
429:     # parent 
430:     #
431:       else
432:         detach
433:         @lifeline.throw
434: 
435:         buf = @lifeline.gets
436:         raise "failed to find slave socket" if buf.nil? or buf.strip.empty?
437:         @socket = buf.strip
438:         trace{ "parent - socket <#{ @socket }>" }
439: 
440:         if @at_exit
441:           @at_exit_thread = @lifeline.on_cut{ 
442:             @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self)
443:           }
444:         end
445: 
446:         if @socket and File::exist? @socket
447:           Kernel.at_exit{ FileUtils::rm_f @socket }
448:           @uri = "drbunix://#{ socket }"
449:           trace{ "parent - uri <#{ @uri }>" }
450:         #
451:         # starting drb on localhost avoids dns lookups!
452:         #
453:           DRb::start_service('druby://localhost:0', nil) unless DRb::thread
454:           @object = DRbObject::new nil, @uri
455:           if @object.respond_to? '__slave_object_failure__'
456:             c, m, bt = Marshal.load @object.__slave_object_failure__
457:             (e = c.new(m)).set_backtrace bt
458:             trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
459:             raise e 
460:           end
461:           @psname ||= gen_psname(@object)
462:         else
463:           raise "failed to find slave socket <#{ @socket }>"
464:         end
465:       end
466: #--}}}
467:     end

a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg

  object = Slave.object{ processor_intensive_object_built_in_child_process() }

eg.

the call can be made asynchronous via the ‘async’/:async keyword

  thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }

  # go on about your coding business then, later

  object = thread.value

[Source]

     # File lib/slave.rb, line 587
587:     def self.object opts = {}, &b
588: #--{{{
589:       async = opts.delete('async') || opts.delete(:async) 
590: 
591:       opts['object'] = opts[:object] = lambda(&b)
592:       opts['dumped'] = opts[:dumped] = true 
593: 
594:       slave = Slave.new opts
595: 
596:       value = lambda do |slave|
597:         begin
598:           slave.object.call
599:         ensure
600:           slave.shutdown
601:         end
602:       end
603: 
604:       async ? Thread.new{ value[slave] } : value[slave] 
605: #--}}}
606:     end

[Source]

    # File lib/slave.rb, line 44
44:     def self.version() VERSION end

Public Instance methods

see docs for Slave.default

[Source]

     # File lib/slave.rb, line 546
546:     def default key
547: #--{{{
548:       self.class.default key
549: #--}}}
550:     end

starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status

[Source]

     # File lib/slave.rb, line 473
473:     def detach
474: #--{{{
475:       reap = lambda do |cid|
476:         begin
477:           @status = Process::waitpid2(cid).last
478:         rescue Exception => e 
479:           m, c, b = e.message, e.class, e.backtrace.join("\n")
480:           warn "#{ m } (#{ c })\n#{ b }"  unless e.is_a? Errno::ECHILD
481:         end
482:       end
483: 
484:       Kernel.at_exit do
485:         shutdown rescue nil
486:         reap[@pid] rescue nil
487:       end
488: 
489:       @waiter = 
490:         Thread.new do
491:           begin
492:             @status = Process::waitpid2(@pid).last
493:           ensure
494:             reap = lambda{|cid| 'no-op' }
495:           end
496:         end
497: #--}}}
498:     end

generate a default name to appear in ps/top

[Source]

     # File lib/slave.rb, line 538
538:     def gen_psname obj
539: #--{{{
540:       "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_')
541: #--}}}
542:     end

see docs for Slave.getopts

[Source]

     # File lib/slave.rb, line 554
554:     def getopts opts 
555: #--{{{
556:       self.class.getopts opts 
557: #--}}}
558:     end

cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown

[Source]

     # File lib/slave.rb, line 517
517:     def shutdown opts = {}
518: #--{{{
519:       quiet = getopts(opts)['quiet']
520:       raise "already shutdown" if @shutdown unless quiet
521:       begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end
522:       begin; @lifeline.cut; rescue Exception; end
523:       raise e if e unless quiet
524:       @shutdown = true
525: #--}}}
526:     end

true

[Source]

     # File lib/slave.rb, line 530
530:     def shutdown?
531: #--{{{
532:       @shutdown
533: #--}}}
534:     end

debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable

[Source]

     # File lib/slave.rb, line 562
562:     def trace
563: #--{{{
564:       if @debug 
565:         STDERR.puts yield
566:         STDERR.flush
567:       end
568: #--}}}
569:     end

wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg

  thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}

[Source]

     # File lib/slave.rb, line 505
505:     def wait opts = {}, &b
506: #--{{{
507:       b ||= lambda{|exit_status|}
508:       non_block = getopts(opts)['non_block']
509:       non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
510: #--}}}
511:     end
wait2(opts = {})

Alias for wait

[Validate]