lib/deep_test/warlock.rb in jstorimer-deep-test-1.4.0 vs lib/deep_test/warlock.rb in jstorimer-deep-test-2.0.0
- old
+ new
@@ -1,37 +1,34 @@
module DeepTest
class Warlock
- def initialize
+ def initialize(options)
+ @options = options
@demons_semaphore = Mutex.new
@demons = []
@reapers = []
end
- def start(name, &block)
+ def start(name, demon, *demon_args)
# Not synchronizing for the fork seems to cause
# random errors (Bus Error, Segfault, and GC non-object)
- # in RemoteWorkerServer processes.
+ # in Beachhead processes.
#
begin
pid = nil
@demons_semaphore.synchronize do
- pid = DeepTest.drb_safe_fork do
+ pid = fork do
# Fork leaves the semaphore locked and we'll never make it
# to end of synchronize block.
#
# The Ruby 1.8.6 C mutex implementation automatically treats
# a mutex locked by a dead thread as unlocked and will raise
# an error if we try to unlock it from this thread.
#
@demons_semaphore.unlock if @demons_semaphore.locked?
- begin
- yield
- rescue Exception => e
- DeepTest.logger.debug "Exception in #{name} (#{Process.pid}): #{e.message}"
- raise
- end
+ close_open_network_connections
+ demon.forked name, @options, demon_args
exit
end
raise "fatal: fork returned nil" if pid.nil?
@@ -44,47 +41,62 @@
puts "exception starting #{name}: #{e}"
puts "\t" + e.backtrace.join("\n\t")
end
end
+ def close_open_network_connections
+ ObjectSpace.each_object(BasicSocket) do |sock|
+ begin
+ sock.close
+ rescue IOError
+ end
+ end
+ end
+
def demon_count
@demons_semaphore.synchronize do
@demons.size
end
end
- def stop_all
- DeepTest.logger.debug("stopping all demons")
+ def stop_demons
+ DeepTest.logger.debug { "stopping all demons" }
receivers = @demons_semaphore.synchronize do
@demons.reverse
end
receivers.reverse.each do |demon|
name, pid = demon
if running?(pid)
- DeepTest.logger.debug("Sending SIGTERM to #{name}, #{pid}")
+ DeepTest.logger.debug { "Sending SIGTERM to #{name}, #{pid}" }
Process.kill("TERM", pid)
end
end
- DeepTest.logger.debug("Warlock: Stopped all receivers")
+ DeepTest.logger.debug { "Warlock: Stopped all receivers" }
- DeepTest.logger.debug("waiting for reapers")
+ DeepTest.logger.debug { "waiting for reapers" }
@reapers.each {|r| r.join}
- DeepTest.logger.debug("Warlock: done reaping processes")
+ DeepTest.logger.debug { "Warlock: done reaping processes" }
end
def exit_when_none_running
Thread.new do
- loop do
- Thread.pass
- exit(0) unless any_running?
- sleep(0.01)
- end
+ wait_for_all_to_finish
+ DeepTest.logger.debug { "exiting #{Process.pid} with all demons finished" }
+ exit(0)
end
end
+ def wait_for_all_to_finish
+ loop do
+ Thread.pass
+ return unless any_running?
+ sleep(0.01)
+ end
+ end
+
def any_running?
@demons_semaphore.synchronize do
@demons.any? {|name, pid| running?(pid)}
end
end
@@ -108,25 +120,25 @@
end
protected
def add_demon(name, pid)
- DeepTest.logger.debug "Started: #{name} (#{pid})"
+ DeepTest.logger.debug { "Started: #{name} (#{pid})" }
@demons << [name, pid]
end
def remove_demon(name, pid)
@demons.delete [name, pid]
- DeepTest.logger.debug "Stopped: #{name} (#{pid})"
+ DeepTest.logger.debug { "Stopped: #{name} (#{pid})" }
end
def launch_reaper_thread(name, pid)
@reapers << Thread.new do
Process.detach(pid).join
- DeepTest.logger.debug("#{name} (#{pid}) reaped")
+ DeepTest.logger.debug { "#{name} (#{pid}) reaped" }
@demons_semaphore.synchronize do
- DeepTest.logger.debug("Warlock Reaper: removing #{name} (#{pid}) from demon list")
+ DeepTest.logger.debug { "Warlock Reaper: removing #{name} (#{pid}) from demon list" }
remove_demon name, pid
end
end
end
end