lib/deep_test/warlock.rb in jstorimer-deep-test-0.2.0 vs lib/deep_test/warlock.rb in jstorimer-deep-test-1.0.0
- old
+ new
@@ -1,34 +1,37 @@
module DeepTest
class Warlock
- def initialize(options)
- @options = options
+ def initialize
@demons_semaphore = Mutex.new
@demons = []
@reapers = []
end
- def start(name, demon, *demon_args)
+ def start(name, &block)
# Not synchronizing for the fork seems to cause
# random errors (Bus Error, Segfault, and GC non-object)
- # in Beachhead processes.
+ # in RemoteWorkerServer processes.
#
begin
pid = nil
@demons_semaphore.synchronize do
- pid = fork do
+ pid = DeepTest.drb_safe_fork do
# Fork leaves the semaphore locked and we'll never make it
# to end of synchronize block.
#
# The Ruby 1.8.6 C mutex implementation automatically treats
# a mutex locked by a dead thread as unlocked and will raise
# an error if we try to unlock it from this thread.
#
@demons_semaphore.unlock if @demons_semaphore.locked?
- close_open_network_connections
- demon.forked name, @options, demon_args
+ begin
+ yield
+ rescue Exception => e
+ DeepTest.logger.debug "Exception in #{name} (#{Process.pid}): #{e.message}"
+ raise
+ end
exit
end
raise "fatal: fork returned nil" if pid.nil?
@@ -41,62 +44,47 @@
puts "exception starting #{name}: #{e}"
puts "\t" + e.backtrace.join("\n\t")
end
end
- def close_open_network_connections
- ObjectSpace.each_object(BasicSocket) do |sock|
- begin
- sock.close
- rescue IOError
- end
- end
- end
-
def demon_count
@demons_semaphore.synchronize do
@demons.size
end
end
- def stop_demons
- DeepTest.logger.debug { "stopping all demons" }
+ def stop_all
+ DeepTest.logger.debug("stopping all demons")
receivers = @demons_semaphore.synchronize do
@demons.reverse
end
receivers.reverse.each do |demon|
name, pid = demon
if running?(pid)
- DeepTest.logger.debug { "Sending SIGTERM to #{name}, #{pid}" }
+ DeepTest.logger.debug("Sending SIGTERM to #{name}, #{pid}")
Process.kill("TERM", pid)
end
end
- DeepTest.logger.debug { "Warlock: Stopped all receivers" }
+ DeepTest.logger.debug("Warlock: Stopped all receivers")
- DeepTest.logger.debug { "waiting for reapers" }
+ DeepTest.logger.debug("waiting for reapers")
@reapers.each {|r| r.join}
- DeepTest.logger.debug { "Warlock: done reaping processes" }
+ DeepTest.logger.debug("Warlock: done reaping processes")
end
def exit_when_none_running
Thread.new do
- wait_for_all_to_finish
- DeepTest.logger.debug { "exiting #{Process.pid} with all demons finished" }
- exit(0)
+ loop do
+ Thread.pass
+ exit(0) unless any_running?
+ sleep(0.01)
+ end
end
end
- def wait_for_all_to_finish
- loop do
- Thread.pass
- return unless any_running?
- sleep(0.01)
- end
- end
-
def any_running?
@demons_semaphore.synchronize do
@demons.any? {|name, pid| running?(pid)}
end
end
@@ -120,25 +108,25 @@
end
protected
def add_demon(name, pid)
- DeepTest.logger.debug { "Started: #{name} (#{pid})" }
+ DeepTest.logger.debug "Started: #{name} (#{pid})"
@demons << [name, pid]
end
def remove_demon(name, pid)
@demons.delete [name, pid]
- DeepTest.logger.debug { "Stopped: #{name} (#{pid})" }
+ DeepTest.logger.debug "Stopped: #{name} (#{pid})"
end
def launch_reaper_thread(name, pid)
@reapers << Thread.new do
Process.detach(pid).join
- DeepTest.logger.debug { "#{name} (#{pid}) reaped" }
+ DeepTest.logger.debug("#{name} (#{pid}) reaped")
@demons_semaphore.synchronize do
- DeepTest.logger.debug { "Warlock Reaper: removing #{name} (#{pid}) from demon list" }
+ DeepTest.logger.debug("Warlock Reaper: removing #{name} (#{pid}) from demon list")
remove_demon name, pid
end
end
end
end