# encoding: ascii-8bit # Copyright 2022 Ball Aerospace & Technologies Corp. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it # under the terms of the GNU Affero General Public License # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. # All changes Copyright 2024, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license # if purchased from OpenC3, Inc. require 'openc3/config/config_parser' require 'openc3/packets/packet_config' require 'openc3/packets/commands' require 'openc3/packets/telemetry' require 'openc3/packets/limits' require 'openc3/system/target' require 'openc3/logs' require 'fileutils' require 'openc3/utilities/zip' require 'bundler' module OpenC3 # System is the primary entry point into the OpenC3 framework. It captures # system wide configuration items such as the available ports and paths to # various files used by the system. The #commands, #telemetry, and #limits # class variables are the primary access points for applications. The # #targets variable provides access to all the targets defined by the system. # Its primary responsibily is to load the system configuration file and # create all the Target instances. It also saves and restores configurations # using a hashing checksum over the entire configuration to detect changes. class SystemConfig # @return [String] Base path of the configuration attr_reader :userpath # @return [Boolean] Whether to use sound for alerts attr_reader :sound # @return [Boolean] Whether to use DNS to lookup IP addresses or not attr_reader :use_dns # @return [Hash] Hash of all the known targets attr_reader :targets # @return [Integer] The number of seconds before a telemetry packet is considered stale attr_reader :staleness_seconds # @return [Boolean] Whether to use UTC or local times attr_reader :use_utc # @return [Hash] Hash of the text/color to use for the classificaiton banner attr_reader :classificiation_banner # @param filename [String] Full path to the system configuration file to # read. def initialize(filename) reset_variables(filename) end # Resets the System's internal state to defaults. # # @param filename [String] Path to system.txt config file to process. Defaults to config/system/system.txt def reset_variables(filename) @targets = {} @config = nil @commands = nil @telemetry = nil @limits = nil @sound = false @use_dns = false @staleness_seconds = 30 @use_utc = false @meta_init_filename = nil @userpath = File.expand_path(File.join(File.dirname(filename), '..', '..')) process_file(filename, File.join(@userpath, 'config', 'targets')) @initial_filename = filename @initial_config = nil @config_blacklist = {} end # Process the system.txt configuration file # # @param filename [String] The configuration file # @param targets_config_dir [String] The configuration directory to # search for the target command and telemetry files. def process_file(filename, targets_config_dir) parser = ConfigParser.new("https://docs.openc3.com/docs") # First pass - Everything except targets parser.parse_file(filename) do |keyword, parameters| case keyword when 'AUTO_DECLARE_TARGETS', 'DECLARE_TARGET', 'DECLARE_GEM_TARGET', 'DECLARE_GEM_MULTI_TARGET' # Will be handled by second pass when 'PORT', 'LISTEN_HOST', 'CONNECT_HOST', 'PATH', 'DEFAULT_PACKET_LOG_WRITER', 'DEFAULT_PACKET_LOG_READER', 'ALLOW_ACCESS', 'ADD_HASH_FILE', 'ADD_MD5_FILE', 'HASHING_ALGORITHM' # Not used by OpenC3 5 when 'ENABLE_SOUND' usage = "#{keyword}" parser.verify_num_parameters(0, 0, usage) @sound = true when 'DISABLE_DNS' usage = "#{keyword}" parser.verify_num_parameters(0, 0, usage) @use_dns = false when 'ENABLE_DNS' usage = "#{keyword}" parser.verify_num_parameters(0, 0, usage) @use_dns = true when 'STALENESS_SECONDS' parser.verify_num_parameters(1, 1, "#{keyword} ") @staleness_seconds = Integer(parameters[0]) when 'META_INIT' parser.verify_num_parameters(1, 1, "#{keyword} ") @meta_init_filename = ConfigParser.handle_nil(parameters[0]) when 'TIME_ZONE_UTC' parser.verify_num_parameters(0, 0, "#{keyword}") @use_utc = true when 'CLASSIFICATION' parser.verify_num_parameters(2, 4, "#{keyword} ") # Determine if the OpenC3 color already exists, otherwise create a new one if OpenC3.constants.include? parameters[1].upcase.to_sym # We were given a named color that already exists in OpenC3 color = parameters[1].upcase else if parameters.length < 4 # We were given a named color, but it didn't exist in OpenC3 already color = OpenC3.getColor(parameters[1].upcase) else # We were given RGB values color = OpenC3.getColor(parameters[1], parameters[2], parameters[3]) end end @classificiation_banner = { 'display_text' => parameters[0], 'color' => color } else # blank lines will have a nil keyword and should not raise an exception raise parser.error("Unknown keyword '#{keyword}'") if keyword end # case keyword end # parser.parse_file # Explicitly set up time to use UTC or local if @use_utc Time.use_utc() else Time.use_local() end # Second pass - Process targets process_targets(parser, filename, targets_config_dir) end # def process_file # Parse the system.txt configuration file looking for keywords associated # with targets and create all the Target instances in the system. # # @param parser [ConfigParser] Parser created by process_file # @param filename (see #process_file) # @param targets_config_dir (see #process_file) def process_targets(parser, filename, targets_config_dir) parser.parse_file(filename) do |keyword, parameters| case keyword when 'AUTO_DECLARE_TARGETS' usage = "#{keyword}" parser.verify_num_parameters(0, 0, usage) path = File.join(@userpath, 'config', 'targets') unless File.exist? path raise parser.error("#{path} must exist", usage) end dirs = [] configuration_dir = File.join(@userpath, 'config', 'targets') Dir.foreach(configuration_dir) { |dir_filename| dirs << dir_filename } dirs.sort! dirs.each do |dir_filename| if dir_filename[0] != '.' if dir_filename == dir_filename.upcase # If any of the targets original directory name matches the # current directory then it must have been already processed by # DECLARE_TARGET so we skip it. next if @targets.select { |_name, target| target.original_name == dir_filename }.length > 0 next if dir_filename == 'SYSTEM' target = Target.new(dir_filename, nil, targets_config_dir) @targets[target.name] = target else raise parser.error("Target folder must be uppercase: '#{dir_filename}'") end end end auto_detect_gem_based_targets() when 'DECLARE_TARGET' usage = "#{keyword} " parser.verify_num_parameters(1, 3, usage) target_name = parameters[0].to_s.upcase if targets_config_dir folder_name = File.join(targets_config_dir, target_name) else folder_name = File.join(@userpath, 'config', 'targets', target_name) end unless Dir.exist?(folder_name) raise parser.error("Target folder must exist '#{folder_name}'.") end substitute_name = nil substitute_name = ConfigParser.handle_nil(parameters[1]) if substitute_name substitute_name = substitute_name.to_s.upcase original_name = target_name target_name = substitute_name else original_name = nil end target = Target.new(target_name, original_name, targets_config_dir, ConfigParser.handle_nil(parameters[2])) @targets[target.name] = target when 'DECLARE_GEM_TARGET' usage = "#{keyword} " parser.verify_num_parameters(1, 3, usage) # Remove 'openc3' from the gem name 'openc3-power-supply' target_name = parameters[0].split('-')[1..-1].join('-').to_s.upcase gem_dir = Gem::Specification.find_by_name(parameters[0]).gem_dir substitute_name = nil substitute_name = ConfigParser.handle_nil(parameters[1]) if substitute_name substitute_name = substitute_name.to_s.upcase original_name = target_name target_name = substitute_name else original_name = nil end target = Target.new(target_name, original_name, targets_config_dir, ConfigParser.handle_nil(parameters[2]), gem_dir) @targets[target.name] = target when 'DECLARE_GEM_MULTI_TARGET' usage = "#{keyword} " parser.verify_num_parameters(2, 4, usage) target_name = parameters[1].to_s.upcase gem_dir = Gem::Specification.find_by_name(parameters[0]).gem_dir gem_dir = File.join(gem_dir, target_name) substitute_name = nil substitute_name = ConfigParser.handle_nil(parameters[2]) if substitute_name substitute_name = substitute_name.to_s.upcase original_name = target_name target_name = substitute_name else original_name = nil end target = Target.new(target_name, original_name, targets_config_dir, ConfigParser.handle_nil(parameters[3]), gem_dir) @targets[target.name] = target end # case keyword end # parser.parse_file # Make sure SYSTEM target is always present and added last unless @targets.key?('SYSTEM') target = Target.new('SYSTEM', nil, targets_config_dir) @targets[target.name] = target end end protected def unzip(zip_file_name) zip_dir = File.join(@paths['TMP'], File.basename(zip_file_name, ".*")) # Only unzip if we have to. We assume the unzipped directory structure is # intact. If not they'll get a popop with the errors encountered when # loading the configuration. unless File.exist? zip_dir Zip::File.open(zip_file_name) do |zip_file| zip_file.each do |entry| path = File.join(@paths['TMP'], entry.name) FileUtils.mkdir_p(File.dirname(path)) zip_file.extract(entry, path) unless File.exist?(path) end end end zip_dir end # A helper method to make the zip writing recursion work def write_zip_entries(base_dir, entries, zip_path, io) io.add(zip_path, base_dir) # Add the directory whether it has entries or not entries.each do |e| zip_file_path = File.join(zip_path, e) disk_file_path = File.join(base_dir, e) if File.directory? disk_file_path recursively_deflate_directory(disk_file_path, io, zip_file_path) else put_into_archive(disk_file_path, io, zip_file_path) end end end def recursively_deflate_directory(disk_file_path, io, zip_file_path) io.add(zip_file_path, disk_file_path) entries = Dir.entries(disk_file_path) - %w(. ..) write_zip_entries(disk_file_path, entries, zip_file_path, io) end def put_into_archive(disk_file_path, io, zip_file_path) io.get_output_stream(zip_file_path) do |f| data = nil File.open(disk_file_path, 'rb') do |file| data = file.read end f.write(data) end end def auto_detect_gem_based_targets Bundler.load.specs.each do |spec| spec_name_split = spec.name.split('-') if spec_name_split.length > 1 && (spec_name_split[0] == 'openc3') # search for multiple targets packaged in a single gem dirs = [] Dir.foreach(spec.gem_dir) { |dir_filename| dirs << dir_filename } dirs.sort! dirs.each do |dir_filename| if dir_filename == "." # check the base directory curr_dir = spec.gem_dir target_name = spec_name_split[1..-1].join('-').to_s.upcase else # check for targets in other directories 1 level deep next if dir_filename[0] == '.' # skip dot directories and ".." next if dir_filename != dir_filename.upcase # skip non uppercase directories curr_dir = File.join(spec.gem_dir, dir_filename) target_name = dir_filename end # check for the cmd_tlm directory - if it has it, then we have found a target if File.directory?(File.join(curr_dir, 'cmd_tlm')) # If any of the targets original directory name matches the # current directory then it must have been already processed by # DECLARE_TARGET so we skip it. next if @targets.select { |_name, target| target.original_name == target_name }.length > 0 target = Target.new(target_name, nil, nil, nil, spec.gem_dir) @targets[target.name] = target end end end end rescue Bundler::GemfileNotFound # No Gemfile - so no gem based targets end def save_configuration configuration = find_configuration(@config.name) configuration = File.join(@paths['SAVED_CONFIG'], File.build_timestamped_filename([@config.name], '.zip')) unless configuration unless File.exist?(configuration) configuration_tmp = File.join(@paths['SAVED_CONFIG'], File.build_timestamped_filename(['tmp_' + @config.name], '.zip.tmp')) begin Zip.continue_on_exists_proc = true Zip::File.open(configuration_tmp, Zip::File::CREATE) do |zipfile| zip_file_path = File.basename(configuration, ".zip") zipfile.mkdir zip_file_path # Copy target files into archive zip_targets = [] @targets.each do |_target_name, target| entries = Dir.entries(target.dir) - %w(. ..) zip_target = File.join(zip_file_path, target.original_name) # Check the stored list of targets. We can't ask the zip file # itself because it's in progress and hasn't been saved unless zip_targets.include?(zip_target) write_zip_entries(target.dir, entries, zip_target, zipfile) zip_targets << zip_target end end # Create custom system.txt file zipfile.get_output_stream(File.join(zip_file_path, 'system.txt')) do |file| @targets.each do |_target_name, target| target_filename = File.basename(target.filename) target_filename = nil unless File.exist?(target.filename) # Create a newline character since Zip opens files in binary mode newline = Kernel.is_windows? ? "\r\n" : "\n" if target.substitute file.write "DECLARE_TARGET #{target.original_name} #{target.name} #{target_filename}#{newline}" else file.write "DECLARE_TARGET #{target.name} nil #{target_filename}#{newline}" end end end end File.rename(configuration_tmp, configuration) File.chmod(0444, configuration) # Mark readonly rescue Exception => e Logger.error "Problem saving configuration to #{configuration}: #{e.class}:#{e.message}\n#{e.backtrace.join("\n")}\n" end end end end end