lib/hydra/master.rb in sskirby-hydra-0.17.1 vs lib/hydra/master.rb in sskirby-hydra-0.21.0

- old
+ new

@@ -1,13 +1,17 @@ require 'hydra/hash' require 'open3' require 'tmpdir' +require 'erb' require 'yaml' + module Hydra #:nodoc: # Hydra class responsible for delegate work down to workers. # # The Master is run once for any given testing session. + class YmlLoadError < StandardError; end + class Master include Hydra::Messages::Master include Open3 traceable('MASTER') @@ -27,20 +31,27 @@ # * :verbose # * Set to true to see lots of Hydra output (for debugging) # * :autosort # * Set to false to disable automatic sorting by historical run-time per file def initialize(opts = { }) - trap("SIGINT") do - puts "Testing halted by user. Untested files:" - puts @incomplete_files.join("\n") - exit - end - opts.stringify_keys! config_file = opts.delete('config') { nil } if config_file - opts.merge!(YAML.load_file(config_file).stringify_keys!) + + begin + config_erb = ERB.new(IO.read(config_file)).result(binding) + rescue Exception => e + raise(YmlLoadError,"config file was found, but could not be parsed with ERB.\n#{$!.inspect}") + end + + begin + config_yml = YAML::load(config_erb) + rescue StandardError => e + raise(YmlLoadError,"config file was found, but could not be parsed.\n#{$!.inspect}") + end + + opts.merge!(config_yml.stringify_keys!) end @files = Array(opts.fetch('files') { nil }) raise "No files, nothing to do" if @files.empty? @incomplete_files = @files.dup @workers = [] @@ -53,13 +64,14 @@ end @verbose = opts.fetch('verbose') { false } @autosort = opts.fetch('autosort') { true } @sync = opts.fetch('sync') { nil } @environment = opts.fetch('environment') { 'test' } + @remote_require = opts.fetch('remote_require') {[]} if @autosort - sort_files_from_report + sort_files_from_report @event_listeners << Hydra::Listener::ReportGenerator.new(File.new(heuristic_file, 'w')) end # default is one worker that is configured to use a pipe with one runner worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } @@ -74,12 +86,15 @@ boot_workers worker_cfg process_messages end # Message handling - - # Send a file down to a worker. + def worker_begin(worker) + @event_listeners.each {|l| l.worker_begin(worker) } + end + + # Send a file down to a worker. def send_file(worker) f = @files.shift if f trace "Sending #{f.inspect}" @event_listeners.each{|l| l.file_begin(f) } @@ -91,18 +106,24 @@ # Process the results coming back from the worker. def process_results(worker, message) if message.output =~ /ActiveRecord::StatementInvalid(.*)[Dd]eadlock/ or message.output =~ /PGError: ERROR(.*)[Dd]eadlock/ or - message.output =~ /Mysql::Error: SAVEPOINT(.*)does not exist: ROLLBACK/ + message.output =~ /Mysql::Error: SAVEPOINT(.*)does not exist: ROLLBACK/ or + message.output =~ /Mysql::Error: Deadlock found/ trace "Deadlock detected running [#{message.file}]. Will retry at the end" @files.push(message.file) + send_file(worker) else @incomplete_files.delete_at(@incomplete_files.index(message.file)) trace "#{@incomplete_files.size} Files Remaining" @event_listeners.each{|l| l.file_end(message.file, message.output) } if @incomplete_files.empty? + @workers.each do |worker| + @event_listeners.each{|l| l.worker_end(worker) } + end + shutdown_all_workers else send_file(worker) end end @@ -110,11 +131,11 @@ # A text report of the time it took to run each file attr_reader :report_text private - + def boot_workers(workers) trace "Booting #{workers.size} workers" workers.each do |worker| worker.stringify_keys! trace "worker opts #{worker.inspect}" @@ -129,38 +150,41 @@ end end def boot_local_worker(worker) runners = worker.fetch('runners') { raise "You must specify the number of runners" } - trace "Booting local worker" + trace "Booting local worker" pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) end + pipe.identify_as_parent @workers << { :pid => child, :io => pipe, :idle => false, :type => :local } end def boot_ssh_worker(worker) sync = Sync.new(worker, @sync, @verbose) + custom_require = @remote_require.map {|r| " require '#{r}';"}.join(' ') + runners = worker.fetch('runners') { raise "You must specify the number of runners" } - command = worker.fetch('command') { - "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" + command = worker.fetch('command') { + "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra';#{custom_require} Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :remote => '#{sync.connect}');\"" } - trace "Booting SSH worker" + trace "Booting SSH worker" ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command) return { :io => ssh, :idle => false, :type => :ssh, :connect => sync.connect } end def shutdown_all_workers trace "Shutting down all workers" @workers.each do |worker| worker[:io].write(Shutdown.new) if worker[:io] - worker[:io].close if worker[:io] + worker[:io].close if worker[:io] end @listeners.each{|t| t.exit} end def process_messages @@ -180,19 +204,19 @@ message = worker[:io].gets trace "got message: #{message}" # if it exists and its for me. # SSH gives us back echoes, so we need to ignore our own messages if message and !message.class.to_s.index("Worker").nil? - message.handle(self, worker) + message.handle(self, worker) end rescue IOError trace "lost Worker [#{worker.inspect}]" Thread.exit end end end end - + @listeners.each{|l| l.join} @event_listeners.each{|l| l.testing_end} end def sort_files_from_report