lib/openc3/microservices/reducer_microservice.rb in openc3-5.10.1 vs lib/openc3/microservices/reducer_microservice.rb in openc3-5.11.0

- old
+ new

@@ -23,12 +23,14 @@ require 'openc3/microservices/microservice' require 'openc3/topics/topic' require 'openc3/topics/telemetry_reduced_topics' require 'openc3/packets/json_packet' require 'openc3/utilities/bucket_file_cache' +require 'openc3/utilities/throttle' require 'openc3/models/reducer_model' require 'openc3/logs/buffered_packet_log_writer' +require 'openc3/ext/reducer_microservice' if RUBY_ENGINE == 'ruby' and !ENV['OPENC3_NO_EXT'] require 'rufus-scheduler' module OpenC3 class ReducerState attr_accessor :reduced @@ -94,17 +96,20 @@ if @config['options'] @config['options'].each do |option| case option[0].upcase when 'BUFFER_DEPTH' # Buffer depth to write in time order @buffer_depth = option[1].to_i + when 'MAX_CPU_UTILIZATION' + @max_cpu_utilization = Float(option[1]) else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end end @buffer_depth = 10 unless @buffer_depth + @max_cpu_utilization = 30.0 unless @max_cpu_utilization @target_name = name.split('__')[-1] @packet_logs = {} @error_count = 0 @metric.set(name: 'reducer_total', value: @count, type: 'counter') @@ -187,16 +192,18 @@ end end end def process_file(filename, type, entry_nanoseconds, file_nanoseconds) + throttle = OpenC3::Throttle.new(@max_cpu_utilization) file = BucketFile.new(filename) file.retrieve unless file.local_path @logger.warn("Reducer Warning: #{filename}: Could not be retrieved") return end + throttle.throttle_sleep # Determine if we already have a PacketLogWriter created _, _, scope, target_name, _, rt_or_stored, _ = File.basename(filename).split('__') stored = (rt_or_stored == "stored") @@ -215,10 +222,11 @@ end # The lifetime of all these variables is a single file - single target / multiple packets reducer_state = {} plr = OpenC3::PacketLogReader.new + throttle.throttle_sleep plr.each(file.local_path) do |packet| # Check to see if we should start a new log file before processing this packet current_time = packet.packet_time.to_nsec_from_epoch check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds) state = setup_state(reducer_state, packet, current_time) @@ -237,136 +245,32 @@ state = setup_state(reducer_state, packet, current_time) end end if type == 'minute' - state.entry_samples ||= packet.json_hash.dup # Grab all the samples from the first packet - if state.first - state.raw_values = packet.read_all(:RAW, nil, packet.read_all_names(:RAW)).select { |key, value| value.is_a?(Numeric) } - state.raw_keys ||= state.raw_values.keys - state.converted_values = packet.read_all(:CONVERTED, nil, packet.read_all_names(:CONVERTED)).select { |key, value| value.is_a?(Numeric) } - state.converted_keys ||= state.converted_values.keys - else - state.raw_values = packet.read_all(:RAW, nil, state.raw_keys).select { |key, value| value.is_a?(Numeric) } - state.converted_values = packet.read_all(:CONVERTED, nil, state.converted_keys).select { |key, value| value.is_a?(Numeric) } - end + get_min_samples(packet, state) else - # Hour or Day - state.entry_samples ||= extract_entry_samples(packet) - if state.first - state.raw_max_values = packet.read_all(:RAW, :MAX, packet.read_all_names(:RAW, :MAX)) - state.raw_keys = state.raw_max_values.keys - state.converted_max_values = packet.read_all(:CONVERTED, :MAX, packet.read_all_names(:CONVERTED, :MAX)) - state.converted_keys = state.converted_max_values.keys - else - state.raw_max_values = packet.read_all(:RAW, :MAX, state.raw_keys) - state.converted_max_values = packet.read_all(:CONVERTED, :MAX, state.converted_keys) - end - state.raw_min_values = packet.read_all(:RAW, :MIN, state.raw_keys) - state.raw_avg_values = packet.read_all(:RAW, :AVG, state.raw_keys) - state.raw_stddev_values = packet.read_all(:RAW, :STDDEV, state.raw_keys) - state.converted_min_values = packet.read_all(:CONVERTED, :MIN, state.converted_keys) - state.converted_avg_values = packet.read_all(:CONVERTED, :AVG, state.converted_keys) - state.converted_stddev_values = packet.read_all(:CONVERTED, :STDDEV, state.converted_keys) + get_hour_day_samples(packet, state) end reduced = state.reduced if type == 'minute' - # Update statistics for this packet's raw values - state.raw_values.each do |key, value| - if value - reduced["#{key}__VALS"] ||= [] - reduced["#{key}__VALS"] << value - reduced["#{key}__N"] ||= value - reduced["#{key}__N"] = value if value < reduced["#{key}__N"] - reduced["#{key}__X"] ||= value - reduced["#{key}__X"] = value if value > reduced["#{key}__X"] - end - end - - # Update statistics for this packet's converted values - state.converted_values.each do |key, value| - if value - reduced["#{key}__CVALS"] ||= [] - reduced["#{key}__CVALS"] << value - reduced["#{key}__CN"] ||= value - reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"] - reduced["#{key}__CX"] ||= value - reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"] - end - end + update_min_stats(reduced, state) else - # Update statistics for this packet's raw values - state.raw_max_values.each do |key, value| - if value - max_key = "#{key}__X" - reduced[max_key] ||= value - reduced[max_key] = value if value > reduced[max_key] - end - end - state.raw_min_values.each do |key, value| - if value - min_key = "#{key}__N" - reduced[min_key] ||= value - reduced[min_key] = value if value < reduced[min_key] - end - end - state.raw_avg_values.each do |key, value| - if value - avg_values_key = "#{key}__AVGVALS" - reduced[avg_values_key] ||= [] - reduced[avg_values_key] << value - end - end - state.raw_stddev_values.each do |key, value| - if value - stddev_values_key = "#{key}__STDDEVVALS" - reduced[stddev_values_key] ||= [] - reduced[stddev_values_key] << value - end - end - - # Update statistics for this packet's converted values - state.converted_max_values.each do |key, value| - if value - max_key = "#{key}__CX" - reduced[max_key] ||= value - reduced[max_key] = value if value > reduced[max_key] - end - end - state.converted_min_values.each do |key, value| - if value - min_key = "#{key}__CN" - reduced[min_key] ||= value - reduced[min_key] = value if value < reduced[min_key] - end - end - state.converted_avg_values.each do |key, value| - if value - avg_values_key = "#{key}__CAVGVALS" - reduced[avg_values_key] ||= [] - reduced[avg_values_key] << value - end - end - state.converted_stddev_values.each do |key, value| - if value - stddev_values_key = "#{key}__CSTDDEVVALS" - reduced[stddev_values_key] ||= [] - reduced[stddev_values_key] << value - end - end - - reduced["_NUM_SAMPLES__VALS"] ||= [] - reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES') + update_raw_hour_day_stats(reduced, state) + update_converted_hour_day_stats(packet, reduced, state) end - state.first = false + + throttle.throttle_sleep end file.delete # Remove the local copy - write_all_entries(reducer_state, plw, type, target_name, stored) + write_all_entries(reducer_state, plw, type, target_name, stored, throttle) + @logger.debug("Reducer Throttle: #{filename}: total_time: #{Time.now - throttle.reset_time}, sleep_time: #{throttle.total_sleep_time}") + @count += 1 @metric.set(name: 'reducer_total', value: @count, type: 'counter') true rescue => e @@ -378,10 +282,144 @@ @error_count += 1 @metric.set(name: 'reducer_error_total', value: @error_count, type: 'counter') false end + def get_min_samples(packet, state) + state.entry_samples ||= packet.json_hash.dup # Grab all the samples from the first packet + if state.first + state.raw_values = packet.read_all(:RAW, nil, packet.read_all_names(:RAW)).select { |key, value| value.is_a?(Numeric) } + state.raw_keys ||= state.raw_values.keys + state.converted_values = packet.read_all(:CONVERTED, nil, packet.read_all_names(:CONVERTED)).select { |key, value| value.is_a?(Numeric) } + state.converted_keys ||= state.converted_values.keys + else + state.raw_values = packet.read_all(:RAW, nil, state.raw_keys).select { |key, value| value.is_a?(Numeric) } + state.converted_values = packet.read_all(:CONVERTED, nil, state.converted_keys).select { |key, value| value.is_a?(Numeric) } + end + end + + def get_hour_day_samples(packet, state) + # Hour or Day + state.entry_samples ||= extract_entry_samples(packet) + if state.first + state.raw_max_values = packet.read_all(:RAW, :MAX, packet.read_all_names(:RAW, :MAX)) + state.raw_keys = state.raw_max_values.keys + state.converted_max_values = packet.read_all(:CONVERTED, :MAX, packet.read_all_names(:CONVERTED, :MAX)) + state.converted_keys = state.converted_max_values.keys + else + state.raw_max_values = packet.read_all(:RAW, :MAX, state.raw_keys) + state.converted_max_values = packet.read_all(:CONVERTED, :MAX, state.converted_keys) + end + state.raw_min_values = packet.read_all(:RAW, :MIN, state.raw_keys) + state.raw_avg_values = packet.read_all(:RAW, :AVG, state.raw_keys) + state.raw_stddev_values = packet.read_all(:RAW, :STDDEV, state.raw_keys) + state.converted_min_values = packet.read_all(:CONVERTED, :MIN, state.converted_keys) + state.converted_avg_values = packet.read_all(:CONVERTED, :AVG, state.converted_keys) + state.converted_stddev_values = packet.read_all(:CONVERTED, :STDDEV, state.converted_keys) + end + + if RUBY_ENGINE != 'ruby' or ENV['OPENC3_NO_EXT'] + def update_min_stats(reduced, state) + # Update statistics for this packet's raw values + state.raw_values.each do |key, value| + if value + vals_key = "#{key}__VALS" + reduced[vals_key] ||= [] + reduced[vals_key] << value + n_key = "#{key}__N" + reduced[n_key] ||= value + reduced[n_key] = value if value < reduced[n_key] + x_key = "#{key}__X" + reduced[x_key] ||= value + reduced[x_key] = value if value > reduced[x_key] + end + end + + # Update statistics for this packet's converted values + state.converted_values.each do |key, value| + if value + cvals_key = "#{key}__CVALS" + reduced[cvals_key] ||= [] + reduced[cvals_key] << value + cn_key = "#{key}__CN" + reduced[cn_key] ||= value + reduced[cn_key] = value if value < reduced[cn_key] + cx_key = "#{key}__CX" + reduced[cx_key] ||= value + reduced[cx_key] = value if value > reduced[cx_key] + end + end + end + end + + def update_raw_hour_day_stats(reduced, state) + # Update statistics for this packet's raw values + state.raw_max_values.each do |key, value| + if value + max_key = "#{key}__X" + reduced[max_key] ||= value + reduced[max_key] = value if value > reduced[max_key] + end + end + state.raw_min_values.each do |key, value| + if value + min_key = "#{key}__N" + reduced[min_key] ||= value + reduced[min_key] = value if value < reduced[min_key] + end + end + state.raw_avg_values.each do |key, value| + if value + avg_values_key = "#{key}__AVGVALS" + reduced[avg_values_key] ||= [] + reduced[avg_values_key] << value + end + end + state.raw_stddev_values.each do |key, value| + if value + stddev_values_key = "#{key}__STDDEVVALS" + reduced[stddev_values_key] ||= [] + reduced[stddev_values_key] << value + end + end + end + + def update_converted_hour_day_stats(packet, reduced, state) + # Update statistics for this packet's converted values + state.converted_max_values.each do |key, value| + if value + max_key = "#{key}__CX" + reduced[max_key] ||= value + reduced[max_key] = value if value > reduced[max_key] + end + end + state.converted_min_values.each do |key, value| + if value + min_key = "#{key}__CN" + reduced[min_key] ||= value + reduced[min_key] = value if value < reduced[min_key] + end + end + state.converted_avg_values.each do |key, value| + if value + avg_values_key = "#{key}__CAVGVALS" + reduced[avg_values_key] ||= [] + reduced[avg_values_key] << value + end + end + state.converted_stddev_values.each do |key, value| + if value + stddev_values_key = "#{key}__CSTDDEVVALS" + reduced[stddev_values_key] ||= [] + reduced[stddev_values_key] << value + end + end + + reduced["_NUM_SAMPLES__VALS"] ||= [] + reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES') + end + def check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds) plw_first_time_nsec = plw.buffered_first_time_nsec if plw_first_time_nsec && ((current_time - plw_first_time_nsec) >= file_nanoseconds) # Write out all entries in progress write_all_entries(reducer_state, plw, type, target_name, stored) @@ -406,13 +444,14 @@ state.current_time = current_time state.entry_time ||= state.current_time # Sets the entry time from the first packet return state end - def write_all_entries(reducer_state, plw, type, target_name, stored) + def write_all_entries(reducer_state, plw, type, target_name, stored, throttle = nil) reducer_state.each do |packet_name, state| write_entry(state, plw, type, target_name, packet_name, stored) + throttle.throttle_sleep if throttle end end def write_entry(state, plw, type, target_name, packet_name, stored) return unless state.reduced.length > 0 @@ -482,29 +521,34 @@ def reduce_running(key, reduced, samples, samples_sum, avg_key, stddev_key, avgvals_key, stddevvals_key) # Calculate Average weighted_sum = 0 avg = reduced["#{key}#{avgvals_key}"] - avg.each_with_index do |val, i| - weighted_sum += (val * samples[i]) + if avg + avg.each_with_index do |val, i| + weighted_sum += (val * samples[i]) + end + reduced["#{key}#{avg_key}"] = weighted_sum / samples_sum end - reduced["#{key}#{avg_key}"] = weighted_sum / samples_sum # Do the STDDEV calc last so we can use the previously calculated AVG # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point s2 = 0 - reduced["#{key}#{stddevvals_key}"].each_with_index do |val, i| - # puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}" - s2 += (samples[i] * avg[i]**2 + val**2) - end + stddev = reduced["#{key}#{stddevvals_key}"] + if stddev + stddev.each_with_index do |val, i| + # puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}" + s2 += (samples[i] * avg[i]**2 + val**2) + end - # Note: For very large numbers with very small deviations this sqrt can fail. - # If so then just set the stddev to 0. - begin - reduced["#{key}#{stddev_key}"] = - Math.sqrt(s2 / samples_sum - reduced["#{key}#{avg_key}"]) - rescue Exception - reduced["#{key}#{stddev_key}"] = 0.0 + # Note: For very large numbers with very small deviations this sqrt can fail. + # If so then just set the stddev to 0. + begin + reduced["#{key}#{stddev_key}"] = + Math.sqrt(s2 / samples_sum - reduced["#{key}#{avg_key}"]) + rescue Exception + reduced["#{key}#{stddev_key}"] = 0.0 + end end reduced.delete("#{key}#{avgvals_key}") reduced.delete("#{key}#{stddevvals_key}") end