lib/openc3/microservices/reducer_microservice.rb in openc3-5.1.1 vs lib/openc3/microservices/reducer_microservice.rb in openc3-5.2.0

- old
+ new

@@ -95,11 +95,11 @@ @config['options'].each do |option| case option[0].upcase when 'BUFFER_DEPTH' # Buffer depth to write in time order @buffer_depth = option[1].to_i else - Logger.error("Unknown option passed to microservice #{@name}: #{option}") + @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end end @buffer_depth = 10 unless @buffer_depth @@ -127,11 +127,11 @@ # block until shutdown is called @scheduler.join end def shutdown - Logger.info("Shutting down reducer microservice: #{@name}") + @logger.info("Shutting down reducer microservice: #{@name}") @scheduler.shutdown(wait: SHUTDOWN_DELAY_SECS) if @scheduler # Make sure all the existing logs are properly closed down threads = [] @packet_logs.each do |name, log| @@ -191,10 +191,14 @@ end def process_file(filename, type, entry_nanoseconds, file_nanoseconds) file = BucketFile.new(filename) file.retrieve + unless file.local_path + @logger.warn("Reducer Warning: #{filename}: Could not be retrieved") + return + end # Determine if we already have a PacketLogWriter created _, _, scope, target_name, _, rt_or_stored, _ = File.basename(filename).split('__') stored = (rt_or_stored == "stored") @@ -269,70 +273,90 @@ reduced = state.reduced if type == 'minute' # Update statistics for this packet's raw values state.raw_values.each do |key, value| - reduced["#{key}__VALS"] ||= [] - reduced["#{key}__VALS"] << value - reduced["#{key}__N"] ||= value - reduced["#{key}__N"] = value if value < reduced["#{key}__N"] - reduced["#{key}__X"] ||= value - reduced["#{key}__X"] = value if value > reduced["#{key}__X"] + if value + reduced["#{key}__VALS"] ||= [] + reduced["#{key}__VALS"] << value + reduced["#{key}__N"] ||= value + reduced["#{key}__N"] = value if value < reduced["#{key}__N"] + reduced["#{key}__X"] ||= value + reduced["#{key}__X"] = value if value > reduced["#{key}__X"] + end end # Update statistics for this packet's converted values state.converted_values.each do |key, value| - reduced["#{key}__CVALS"] ||= [] - reduced["#{key}__CVALS"] << value - reduced["#{key}__CN"] ||= value - reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"] - reduced["#{key}__CX"] ||= value - reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"] + if value + reduced["#{key}__CVALS"] ||= [] + reduced["#{key}__CVALS"] << value + reduced["#{key}__CN"] ||= value + reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"] + reduced["#{key}__CX"] ||= value + reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"] + end end else # Update statistics for this packet's raw values state.raw_max_values.each do |key, value| - max_key = "#{key}__X" - reduced[max_key] ||= value - reduced[max_key] = value if value > reduced[max_key] + if value + max_key = "#{key}__X" + reduced[max_key] ||= value + reduced[max_key] = value if value > reduced[max_key] + end end state.raw_min_values.each do |key, value| - min_key = "#{key}__N" - reduced[min_key] ||= value - reduced[min_key] = value if value < reduced[min_key] + if value + min_key = "#{key}__N" + reduced[min_key] ||= value + reduced[min_key] = value if value < reduced[min_key] + end end state.raw_avg_values.each do |key, value| - avg_values_key = "#{key}__AVGVALS" - reduced[avg_values_key] ||= [] - reduced[avg_values_key] << value + if value + avg_values_key = "#{key}__AVGVALS" + reduced[avg_values_key] ||= [] + reduced[avg_values_key] << value + end end state.raw_stddev_values.each do |key, value| - stddev_values_key = "#{key}__STDDEVVALS" - reduced[stddev_values_key] ||= [] - reduced[stddev_values_key] << value + if value + stddev_values_key = "#{key}__STDDEVVALS" + reduced[stddev_values_key] ||= [] + reduced[stddev_values_key] << value + end end # Update statistics for this packet's converted values state.converted_max_values.each do |key, value| - max_key = "#{key}__CX" - reduced[max_key] ||= value - reduced[max_key] = value if value > reduced[max_key] + if value + max_key = "#{key}__CX" + reduced[max_key] ||= value + reduced[max_key] = value if value > reduced[max_key] + end end state.converted_min_values.each do |key, value| - min_key = "#{key}__CN" - reduced[min_key] ||= value - reduced[min_key] = value if value < reduced[min_key] + if value + min_key = "#{key}__CN" + reduced[min_key] ||= value + reduced[min_key] = value if value < reduced[min_key] + end end state.converted_avg_values.each do |key, value| - avg_values_key = "#{key}__CAVGVALS" - reduced[avg_values_key] ||= [] - reduced[avg_values_key] << value + if value + avg_values_key = "#{key}__CAVGVALS" + reduced[avg_values_key] ||= [] + reduced[avg_values_key] << value + end end state.converted_stddev_values.each do |key, value| - stddev_values_key = "#{key}__CSTDDEVVALS" - reduced[stddev_values_key] ||= [] - reduced[stddev_values_key] << value + if value + stddev_values_key = "#{key}__CSTDDEVVALS" + reduced[stddev_values_key] ||= [] + reduced[stddev_values_key] << value + end end reduced["_NUM_SAMPLES__VALS"] ||= [] reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES') end @@ -343,12 +367,12 @@ write_all_entries(reducer_state, plw, type, target_name, stored) true rescue => e if file.local_path and File.exist?(file.local_path) - Logger.error("Reducer Error: #{filename}: #{File.size(file.local_path)} bytes: \n#{e.formatted}") + @logger.error("Reducer Error: #{filename}: #{File.size(file.local_path)} bytes: \n#{e.formatted}") else - Logger.error("Reducer Error: #{filename}: \n#{e.formatted}") + @logger.error("Reducer Error: #{filename}: \n#{e.formatted}") end false end def check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds)