require 'roby' require 'active_support/core_ext/string/inflections' class String # :nodoc: all include ActiveSupport::CoreExtensions::String::Inflections end require 'test/unit' require 'roby/test/common' require 'roby/test/tools' require 'fileutils' module Roby module Test extend Logger::Hierarchy extend Logger::Forward @event_assertions = [] @waiting_threads = [] ASSERT_ANY_EVENTS_TLS = :assert_any_events class << self # A [thread, cv, positive, negative] list of event assertions attr_reader :event_assertions # Tests for events in +positive+ and +negative+ and returns # the set of failing events if the assertion has finished. # If the set is empty, it means that the assertion finished # successfully def assert_any_event_result(positive, negative) if positive_ev = positive.find { |ev| ev.happened? } return false, "#{positive_ev} happened" end failure = negative.find_all { |ev| ev.happened? } unless failure.empty? return true, "#{failure} happened" end if positive.all? { |ev| ev.unreachable? } return true, "all positive events are unreachable" end nil end # This method is inserted in the control thread to implement # Assertions#assert_events def check_event_assertions event_assertions.delete_if do |thread, cv, positive, negative| error, result = assert_any_event_result(positive, negative) if !error.nil? thread[ASSERT_ANY_EVENTS_TLS] = [error, result] cv.broadcast true end end end def finalize_event_assertions check_event_assertions event_assertions.dup.each do |thread, *_| thread.raise ControlQuitError end end # A set of threads waiting for something to happen. This is used # during #teardown to make sure no threads are block indefinitely attr_reader :waiting_threads # This proc is to be called by Control when it quits. It makes sure # that threads which are waiting are interrupted def interrupt_waiting_threads waiting_threads.dup.each do |task| task.raise ControlQuitError end ensure waiting_threads.clear end end Roby::Control.at_cycle_end(&method(:check_event_assertions)) Roby::Control.finalizers << method(:finalize_event_assertions) Roby::Control.finalizers << method(:interrupt_waiting_threads) module Assertions # Wait for any event in +positive+ to happen. If +negative+ is # non-empty, any event happening in this set will make the # assertion fail. If events in +positive+ are task events, the # :stop events of the corresponding tasks are added to negative # automatically. # # If a block is given, it is called from within the control thread # after the checks are in place # # So, to check that a task fails, do # # assert_events(task.event(:fail)) do # task.start! # end # def assert_any_event(positive, negative = [], msg = nil, &block) control_priority do Roby.condition_variable(false) do |cv| positive = Array[*positive].to_value_set negative = Array[*negative].to_value_set unreachability_reason = ValueSet.new Roby::Control.synchronize do positive.each do |ev| ev.if_unreachable(true) do |reason| unreachability_reason << reason if reason end end error, result = Test.assert_any_event_result(positive, negative) if error.nil? this_thread = Thread.current Test.event_assertions << [this_thread, cv, positive, negative] Roby.once(&block) if block_given? begin cv.wait(Roby::Control.mutex) ensure Test.event_assertions.delete_if { |thread, _| thread == this_thread } end error, result = this_thread[ASSERT_ANY_EVENTS_TLS] end if error if !unreachability_reason.empty? msg = unreachability_reason.map do |reason| if reason.respond_to?(:context) context = reason.context.map do |obj| if obj.kind_of?(Exception) obj.full_message else obj.to_s end end reason.to_s + context.join("\n ") end end msg.join("\n ") flunk("#{msg} all positive events are unreachable for the following reason:\n #{msg}") elsif msg flunk("#{msg} failed: #{result}") else flunk(result) end end end end end end # Starts +task+ and checks it succeeds def assert_succeeds(task, *args) control_priority do if !task.kind_of?(Roby::Task) Roby.execute do plan.insert(task = planner.send(task, *args)) end end assert_any_event([task.event(:success)], [], nil) do plan.permanent(task) task.start! if task.pending? yield if block_given? end end end def control_priority old_priority = Thread.current.priority Thread.current.priority = Roby.control.thread.priority + 1 yield ensure Thread.current.priority = old_priority end # This assertion fails if the relative error between +found+ and # +expected+is more than +error+ def assert_relative_error(expected, found, error, msg = "") if expected == 0 assert_in_delta(0, found, error, "comparing #{found} to #{expected} in #{msg}") else assert_in_delta(0, (found - expected) / expected, error, "comparing #{found} to #{expected} in #{msg}") end end # This assertion fails if +found+ and +expected+ are more than +dl+ # meters apart in the x, y and z coordinates, or +dt+ radians apart # in angles def assert_same_position(expected, found, dl = 0.01, dt = 0.01, msg = "") assert_relative_error(expected.x, found.x, dl, msg) assert_relative_error(expected.y, found.y, dl, msg) assert_relative_error(expected.z, found.z, dl, msg) assert_relative_error(expected.yaw, found.yaw, dt, msg) assert_relative_error(expected.pitch, found.pitch, dt, msg) assert_relative_error(expected.roll, found.roll, dt, msg) end end # This is the base class for running tests which uses a Roby control # loop (i.e. plan execution). # # Because configuration and planning can be robot-specific, parts of # the tests can also be splitted into generic parts and specific parts. # The TestCase.robot statement allows to specify that a given test case # is specific to a given robot, in which case it is ran only if the # call to scripts/test specified a robot which matches (i.e. # same name and type). # # Finally, two other mode of operation control the way tests are ran # [simulation] # if the --sim flag is given to scripts/test, the # tests are ran under simulation. Otherwise, they are run in live # mode (see Roby::Application for a description of simulation and # live modes). It is possible to constrain that a given test method # is run only in simulation or live mode with the TestCase.sim and # TestCase.nosim statements: # # sim :sim_only # def test_sim_only # end # # nosim :live_only # def test_live_only # end # [interactive] # Sometime, it is hard to actually assess the quality of processing # results automatically. In these cases, it is possible to show the # user the result of data processing, and then ask if the result is # valid by using the #user_validation method. Nonetheless, the tests # can be ran in automatic mode, in which the assertions which require # user validation are simply skipped. The --interactive or # -i flags of scripts/test specify that user # interaction is possible. class TestCase < Test::Unit::TestCase include Roby::Test include Assertions class << self attribute(:case_config) { Hash.new } attribute(:methods_config) { Hash.new } attr_reader :app_setup end # Sets the robot configuration for this test case. If a block is # given, it is called between the time the robot configuration is # loaded and the time the test methods are started. It can # therefore be used to change the robot configuration for the need # of this particular test case def self.robot(name, kind = name, &block) @app_setup = [name, kind, block] apply_robot_setup end @@first_time = true # Loads the configuration as specified by TestCase.robot def self.apply_robot_setup app = Roby.app if @@first_time # Make sure the log directory is empty if File.exists?(app.log_dir) if !Dir.new(app.log_dir).empty? if !STDIN.ask("#{app.log_dir} still exists and must be cleaned before starting. Proceed ? [N,y]", false) raise "user abort" end end FileUtils.rm_rf app.log_dir end @@first_time = false end name, kind, block = app_setup # Silently ignore the test suites which use a different robot if app.robot_name && (app.robot_name != name || app.robot_type != kind) return end app.robot name, kind app.reset app.single app.setup if block block.call end app.control.delete('executive') yield if block_given? end # Returns a fresh MainPlanner object for the current plan def planner MainPlanner.new(plan) end def setup # :nodoc: super Roby::Test.waiting_threads << Thread.current end def teardown # :nodoc: Roby::Test.waiting_threads.delete(Thread.current) super end def method_config # :nodoc: self.class.case_config.merge(self.class.methods_config[method_name] || Hash.new) end # Returns true if user interaction is to be disabled during this test def automatic_testing? Roby.app.automatic_testing? end # Progress report for the curren test. If +max+ is given, then # +value+ is assumed to be between 0 and +max+. Otherwise, +value+ # is a float value between 0 and 1 and is displayed as a percentage. def progress(value, max = nil) if max print "\r#{@method_name} progress: #{value}/#{max}" else print "\r#{@method_name} progress: #{"%.2f %%" % [value * 100]}" end STDOUT.flush end def user_interaction return unless automatic_testing? test_result = catch(:validation_result) do yield return end if test_result flunk(*test_result) end end # Ask for user validation. The method first yields, and then asks # the user if the showed dataset is nominal. If the tests are ran # in automated mode (#automatic_testing? returns true), it does # nothing. def user_validation(msg) return if automatic_testing? assert_block(msg) do STDOUT.puts "Now validating #{msg}" yield STDIN.ask("\rIs the result OK ? [N,y]", false) end end # Do not run +test_name+ inside a simulation environment # +test_name+ is the name of the method without +test_+. For # instance: # nosim :init # def test_init # end # # See also TestCase.sim def self.nosim(*names) names.each do |test_name| config = (methods_config[test_name.to_s] ||= Hash.new) config[:mode] = :nosim end end # Run +test_name+ only inside a simulation environment # +test_name+ is the name of the method without +test_+. For # instance: # sim :init # def test_init # end # # See also TestCase.nosim def self.sim(*names) names.each do |test_name| config = (methods_config[test_name.to_s] ||= Hash.new) config[:mode] = :sim end end def self.suite # :nodoc: method_names = public_instance_methods(true) tests = method_names.delete_if {|method_name| method_name !~ /^(dataset|test)./} suite = Test::Unit::TestSuite.new(name) tests.sort.each do |test| catch(:invalid_test) do suite << new(test) end end if (suite.empty?) catch(:invalid_test) do suite << new("default_test") end end return suite end def run(result) # :nodoc: Roby::Test.waiting_threads.clear self.class.apply_robot_setup do yield if block_given? case method_config[:mode] when :nosim return if Roby.app.simulation? when :sim return unless Roby.app.simulation? end @failed_test = false begin Roby.app.run do super end rescue Exception => e if @_result add_error(e) else raise end end keep_logdir = @failed_test || Roby.app.testing_keep_logs? save_logdir = (@failed_test && automatic_testing?) || Roby.app.testing_keep_logs? if save_logdir subdir = @failed_test ? 'failures' : 'results' basedir = File.join(APP_DIR, 'test', subdir) dirname = Roby::Application.unique_dirname(basedir, dataset_prefix) if Roby.app.testing_overwrites_logs? dirname.gsub! /\.\d+$/, '' FileUtils.rm_rf dirname end FileUtils.mv Roby.app.log_dir, dirname end if !keep_logdir FileUtils.rm_rf Roby.app.log_dir end end rescue Exception puts "testcase #{method_name} teardown failed with\n#{$!.full_message}" end def add_error(*args, &block) # :nodoc: @failed_test = true super end def add_failure(*args, &block) # :nodoc: @failed_test = true super end # The directory in which datasets are to be saved def datasets_dir "#{APP_DIR}/test/datasets" end # The directory into which the datasets generated by the current # testcase are to be saved. def dataset_prefix "#{Roby.app.robot_name}-#{self.class.name.gsub('TC_', '').underscore}/, '')}" end # Returns the full path of the file name into which the log file +file+ # should be saved to be referred to as the +dataset_name+ dataset def dataset_file_path(dataset_name, file) path = File.join(datasets_dir, dataset_name, file) if !File.file?(path) raise "#{path} does not exist" end path rescue flunk("dataset #{dataset_name} has not been generated: #{$!.message}") end # Saves +file+, which is taken in the log directory, in the # test/datasets directory. The data set is saved as # 'robot-testname-testmethod-suffix' def save_dataset(files = nil, suffix = '') destname = dataset_prefix destname << "-#{suffix}" unless suffix.empty? dir = File.join(datasets_dir, destname) if File.exists?(dir) relative_dir = dir.gsub(/^#{Regexp.quote(APP_DIR)}/, '') unless STDIN.ask("\r#{relative_dir} already exists. Delete ? [N,y]", false) raise "user abort" end FileUtils.rm_rf dir end FileUtils.mkdir_p(dir) files ||= Dir.entries(Roby.app.log_dir).find_all do |path| File.file? File.join(Roby.app.log_dir, path) end [*files].each do |path| FileUtils.mv "#{Roby.app.log_dir}/#{path}", dir end end def sampling(*args, &block); Test.sampling(*args, &block) end def stats(*args, &block); Test.stats(*args, &block) end end end end