lib/hydra/master.rb in sskirby-hydra-0.17.1 vs lib/hydra/master.rb in sskirby-hydra-0.21.0
- old
+ new
@@ -1,13 +1,17 @@
require 'hydra/hash'
require 'open3'
require 'tmpdir'
+require 'erb'
require 'yaml'
+
module Hydra #:nodoc:
# Hydra class responsible for delegate work down to workers.
#
# The Master is run once for any given testing session.
+ class YmlLoadError < StandardError; end
+
class Master
include Hydra::Messages::Master
include Open3
traceable('MASTER')
@@ -27,20 +31,27 @@
# * :verbose
# * Set to true to see lots of Hydra output (for debugging)
# * :autosort
# * Set to false to disable automatic sorting by historical run-time per file
def initialize(opts = { })
- trap("SIGINT") do
- puts "Testing halted by user. Untested files:"
- puts @incomplete_files.join("\n")
- exit
- end
-
opts.stringify_keys!
config_file = opts.delete('config') { nil }
if config_file
- opts.merge!(YAML.load_file(config_file).stringify_keys!)
+
+ begin
+ config_erb = ERB.new(IO.read(config_file)).result(binding)
+ rescue Exception => e
+ raise(YmlLoadError,"config file was found, but could not be parsed with ERB.\n#{$!.inspect}")
+ end
+
+ begin
+ config_yml = YAML::load(config_erb)
+ rescue StandardError => e
+ raise(YmlLoadError,"config file was found, but could not be parsed.\n#{$!.inspect}")
+ end
+
+ opts.merge!(config_yml.stringify_keys!)
end
@files = Array(opts.fetch('files') { nil })
raise "No files, nothing to do" if @files.empty?
@incomplete_files = @files.dup
@workers = []
@@ -53,13 +64,14 @@
end
@verbose = opts.fetch('verbose') { false }
@autosort = opts.fetch('autosort') { true }
@sync = opts.fetch('sync') { nil }
@environment = opts.fetch('environment') { 'test' }
+ @remote_require = opts.fetch('remote_require') {[]}
if @autosort
- sort_files_from_report
+ sort_files_from_report
@event_listeners << Hydra::Listener::ReportGenerator.new(File.new(heuristic_file, 'w'))
end
# default is one worker that is configured to use a pipe with one runner
worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] }
@@ -74,12 +86,15 @@
boot_workers worker_cfg
process_messages
end
# Message handling
-
- # Send a file down to a worker.
+ def worker_begin(worker)
+ @event_listeners.each {|l| l.worker_begin(worker) }
+ end
+
+ # Send a file down to a worker.
def send_file(worker)
f = @files.shift
if f
trace "Sending #{f.inspect}"
@event_listeners.each{|l| l.file_begin(f) }
@@ -91,18 +106,24 @@
# Process the results coming back from the worker.
def process_results(worker, message)
if message.output =~ /ActiveRecord::StatementInvalid(.*)[Dd]eadlock/ or
message.output =~ /PGError: ERROR(.*)[Dd]eadlock/ or
- message.output =~ /Mysql::Error: SAVEPOINT(.*)does not exist: ROLLBACK/
+ message.output =~ /Mysql::Error: SAVEPOINT(.*)does not exist: ROLLBACK/ or
+ message.output =~ /Mysql::Error: Deadlock found/
trace "Deadlock detected running [#{message.file}]. Will retry at the end"
@files.push(message.file)
+ send_file(worker)
else
@incomplete_files.delete_at(@incomplete_files.index(message.file))
trace "#{@incomplete_files.size} Files Remaining"
@event_listeners.each{|l| l.file_end(message.file, message.output) }
if @incomplete_files.empty?
+ @workers.each do |worker|
+ @event_listeners.each{|l| l.worker_end(worker) }
+ end
+
shutdown_all_workers
else
send_file(worker)
end
end
@@ -110,11 +131,11 @@
# A text report of the time it took to run each file
attr_reader :report_text
private
-
+
def boot_workers(workers)
trace "Booting #{workers.size} workers"
workers.each do |worker|
worker.stringify_keys!
trace "worker opts #{worker.inspect}"
@@ -129,38 +150,41 @@
end
end
def boot_local_worker(worker)
runners = worker.fetch('runners') { raise "You must specify the number of runners" }
- trace "Booting local worker"
+ trace "Booting local worker"
pipe = Hydra::Pipe.new
child = SafeFork.fork do
pipe.identify_as_child
Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose)
end
+
pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false, :type => :local }
end
def boot_ssh_worker(worker)
sync = Sync.new(worker, @sync, @verbose)
+ custom_require = @remote_require.map {|r| " require '#{r}';"}.join(' ')
+
runners = worker.fetch('runners') { raise "You must specify the number of runners" }
- command = worker.fetch('command') {
- "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
+ command = worker.fetch('command') {
+ "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra';#{custom_require} Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :remote => '#{sync.connect}');\""
}
- trace "Booting SSH worker"
+ trace "Booting SSH worker"
ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command)
return { :io => ssh, :idle => false, :type => :ssh, :connect => sync.connect }
end
def shutdown_all_workers
trace "Shutting down all workers"
@workers.each do |worker|
worker[:io].write(Shutdown.new) if worker[:io]
- worker[:io].close if worker[:io]
+ worker[:io].close if worker[:io]
end
@listeners.each{|t| t.exit}
end
def process_messages
@@ -180,19 +204,19 @@
message = worker[:io].gets
trace "got message: #{message}"
# if it exists and its for me.
# SSH gives us back echoes, so we need to ignore our own messages
if message and !message.class.to_s.index("Worker").nil?
- message.handle(self, worker)
+ message.handle(self, worker)
end
rescue IOError
trace "lost Worker [#{worker.inspect}]"
Thread.exit
end
end
end
end
-
+
@listeners.each{|l| l.join}
@event_listeners.each{|l| l.testing_end}
end
def sort_files_from_report