lib/openc3/microservices/reducer_microservice.rb in openc3-5.10.1 vs lib/openc3/microservices/reducer_microservice.rb in openc3-5.11.0
- old
+ new
@@ -23,12 +23,14 @@
require 'openc3/microservices/microservice'
require 'openc3/topics/topic'
require 'openc3/topics/telemetry_reduced_topics'
require 'openc3/packets/json_packet'
require 'openc3/utilities/bucket_file_cache'
+require 'openc3/utilities/throttle'
require 'openc3/models/reducer_model'
require 'openc3/logs/buffered_packet_log_writer'
+require 'openc3/ext/reducer_microservice' if RUBY_ENGINE == 'ruby' and !ENV['OPENC3_NO_EXT']
require 'rufus-scheduler'
module OpenC3
class ReducerState
attr_accessor :reduced
@@ -94,17 +96,20 @@
if @config['options']
@config['options'].each do |option|
case option[0].upcase
when 'BUFFER_DEPTH' # Buffer depth to write in time order
@buffer_depth = option[1].to_i
+ when 'MAX_CPU_UTILIZATION'
+ @max_cpu_utilization = Float(option[1])
else
@logger.error("Unknown option passed to microservice #{@name}: #{option}")
end
end
end
@buffer_depth = 10 unless @buffer_depth
+ @max_cpu_utilization = 30.0 unless @max_cpu_utilization
@target_name = name.split('__')[-1]
@packet_logs = {}
@error_count = 0
@metric.set(name: 'reducer_total', value: @count, type: 'counter')
@@ -187,16 +192,18 @@
end
end
end
def process_file(filename, type, entry_nanoseconds, file_nanoseconds)
+ throttle = OpenC3::Throttle.new(@max_cpu_utilization)
file = BucketFile.new(filename)
file.retrieve
unless file.local_path
@logger.warn("Reducer Warning: #{filename}: Could not be retrieved")
return
end
+ throttle.throttle_sleep
# Determine if we already have a PacketLogWriter created
_, _, scope, target_name, _, rt_or_stored, _ = File.basename(filename).split('__')
stored = (rt_or_stored == "stored")
@@ -215,10 +222,11 @@
end
# The lifetime of all these variables is a single file - single target / multiple packets
reducer_state = {}
plr = OpenC3::PacketLogReader.new
+ throttle.throttle_sleep
plr.each(file.local_path) do |packet|
# Check to see if we should start a new log file before processing this packet
current_time = packet.packet_time.to_nsec_from_epoch
check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds)
state = setup_state(reducer_state, packet, current_time)
@@ -237,136 +245,32 @@
state = setup_state(reducer_state, packet, current_time)
end
end
if type == 'minute'
- state.entry_samples ||= packet.json_hash.dup # Grab all the samples from the first packet
- if state.first
- state.raw_values = packet.read_all(:RAW, nil, packet.read_all_names(:RAW)).select { |key, value| value.is_a?(Numeric) }
- state.raw_keys ||= state.raw_values.keys
- state.converted_values = packet.read_all(:CONVERTED, nil, packet.read_all_names(:CONVERTED)).select { |key, value| value.is_a?(Numeric) }
- state.converted_keys ||= state.converted_values.keys
- else
- state.raw_values = packet.read_all(:RAW, nil, state.raw_keys).select { |key, value| value.is_a?(Numeric) }
- state.converted_values = packet.read_all(:CONVERTED, nil, state.converted_keys).select { |key, value| value.is_a?(Numeric) }
- end
+ get_min_samples(packet, state)
else
- # Hour or Day
- state.entry_samples ||= extract_entry_samples(packet)
- if state.first
- state.raw_max_values = packet.read_all(:RAW, :MAX, packet.read_all_names(:RAW, :MAX))
- state.raw_keys = state.raw_max_values.keys
- state.converted_max_values = packet.read_all(:CONVERTED, :MAX, packet.read_all_names(:CONVERTED, :MAX))
- state.converted_keys = state.converted_max_values.keys
- else
- state.raw_max_values = packet.read_all(:RAW, :MAX, state.raw_keys)
- state.converted_max_values = packet.read_all(:CONVERTED, :MAX, state.converted_keys)
- end
- state.raw_min_values = packet.read_all(:RAW, :MIN, state.raw_keys)
- state.raw_avg_values = packet.read_all(:RAW, :AVG, state.raw_keys)
- state.raw_stddev_values = packet.read_all(:RAW, :STDDEV, state.raw_keys)
- state.converted_min_values = packet.read_all(:CONVERTED, :MIN, state.converted_keys)
- state.converted_avg_values = packet.read_all(:CONVERTED, :AVG, state.converted_keys)
- state.converted_stddev_values = packet.read_all(:CONVERTED, :STDDEV, state.converted_keys)
+ get_hour_day_samples(packet, state)
end
reduced = state.reduced
if type == 'minute'
- # Update statistics for this packet's raw values
- state.raw_values.each do |key, value|
- if value
- reduced["#{key}__VALS"] ||= []
- reduced["#{key}__VALS"] << value
- reduced["#{key}__N"] ||= value
- reduced["#{key}__N"] = value if value < reduced["#{key}__N"]
- reduced["#{key}__X"] ||= value
- reduced["#{key}__X"] = value if value > reduced["#{key}__X"]
- end
- end
-
- # Update statistics for this packet's converted values
- state.converted_values.each do |key, value|
- if value
- reduced["#{key}__CVALS"] ||= []
- reduced["#{key}__CVALS"] << value
- reduced["#{key}__CN"] ||= value
- reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"]
- reduced["#{key}__CX"] ||= value
- reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"]
- end
- end
+ update_min_stats(reduced, state)
else
- # Update statistics for this packet's raw values
- state.raw_max_values.each do |key, value|
- if value
- max_key = "#{key}__X"
- reduced[max_key] ||= value
- reduced[max_key] = value if value > reduced[max_key]
- end
- end
- state.raw_min_values.each do |key, value|
- if value
- min_key = "#{key}__N"
- reduced[min_key] ||= value
- reduced[min_key] = value if value < reduced[min_key]
- end
- end
- state.raw_avg_values.each do |key, value|
- if value
- avg_values_key = "#{key}__AVGVALS"
- reduced[avg_values_key] ||= []
- reduced[avg_values_key] << value
- end
- end
- state.raw_stddev_values.each do |key, value|
- if value
- stddev_values_key = "#{key}__STDDEVVALS"
- reduced[stddev_values_key] ||= []
- reduced[stddev_values_key] << value
- end
- end
-
- # Update statistics for this packet's converted values
- state.converted_max_values.each do |key, value|
- if value
- max_key = "#{key}__CX"
- reduced[max_key] ||= value
- reduced[max_key] = value if value > reduced[max_key]
- end
- end
- state.converted_min_values.each do |key, value|
- if value
- min_key = "#{key}__CN"
- reduced[min_key] ||= value
- reduced[min_key] = value if value < reduced[min_key]
- end
- end
- state.converted_avg_values.each do |key, value|
- if value
- avg_values_key = "#{key}__CAVGVALS"
- reduced[avg_values_key] ||= []
- reduced[avg_values_key] << value
- end
- end
- state.converted_stddev_values.each do |key, value|
- if value
- stddev_values_key = "#{key}__CSTDDEVVALS"
- reduced[stddev_values_key] ||= []
- reduced[stddev_values_key] << value
- end
- end
-
- reduced["_NUM_SAMPLES__VALS"] ||= []
- reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES')
+ update_raw_hour_day_stats(reduced, state)
+ update_converted_hour_day_stats(packet, reduced, state)
end
-
state.first = false
+
+ throttle.throttle_sleep
end
file.delete # Remove the local copy
- write_all_entries(reducer_state, plw, type, target_name, stored)
+ write_all_entries(reducer_state, plw, type, target_name, stored, throttle)
+ @logger.debug("Reducer Throttle: #{filename}: total_time: #{Time.now - throttle.reset_time}, sleep_time: #{throttle.total_sleep_time}")
+
@count += 1
@metric.set(name: 'reducer_total', value: @count, type: 'counter')
true
rescue => e
@@ -378,10 +282,144 @@
@error_count += 1
@metric.set(name: 'reducer_error_total', value: @error_count, type: 'counter')
false
end
+ def get_min_samples(packet, state)
+ state.entry_samples ||= packet.json_hash.dup # Grab all the samples from the first packet
+ if state.first
+ state.raw_values = packet.read_all(:RAW, nil, packet.read_all_names(:RAW)).select { |key, value| value.is_a?(Numeric) }
+ state.raw_keys ||= state.raw_values.keys
+ state.converted_values = packet.read_all(:CONVERTED, nil, packet.read_all_names(:CONVERTED)).select { |key, value| value.is_a?(Numeric) }
+ state.converted_keys ||= state.converted_values.keys
+ else
+ state.raw_values = packet.read_all(:RAW, nil, state.raw_keys).select { |key, value| value.is_a?(Numeric) }
+ state.converted_values = packet.read_all(:CONVERTED, nil, state.converted_keys).select { |key, value| value.is_a?(Numeric) }
+ end
+ end
+
+ def get_hour_day_samples(packet, state)
+ # Hour or Day
+ state.entry_samples ||= extract_entry_samples(packet)
+ if state.first
+ state.raw_max_values = packet.read_all(:RAW, :MAX, packet.read_all_names(:RAW, :MAX))
+ state.raw_keys = state.raw_max_values.keys
+ state.converted_max_values = packet.read_all(:CONVERTED, :MAX, packet.read_all_names(:CONVERTED, :MAX))
+ state.converted_keys = state.converted_max_values.keys
+ else
+ state.raw_max_values = packet.read_all(:RAW, :MAX, state.raw_keys)
+ state.converted_max_values = packet.read_all(:CONVERTED, :MAX, state.converted_keys)
+ end
+ state.raw_min_values = packet.read_all(:RAW, :MIN, state.raw_keys)
+ state.raw_avg_values = packet.read_all(:RAW, :AVG, state.raw_keys)
+ state.raw_stddev_values = packet.read_all(:RAW, :STDDEV, state.raw_keys)
+ state.converted_min_values = packet.read_all(:CONVERTED, :MIN, state.converted_keys)
+ state.converted_avg_values = packet.read_all(:CONVERTED, :AVG, state.converted_keys)
+ state.converted_stddev_values = packet.read_all(:CONVERTED, :STDDEV, state.converted_keys)
+ end
+
+ if RUBY_ENGINE != 'ruby' or ENV['OPENC3_NO_EXT']
+ def update_min_stats(reduced, state)
+ # Update statistics for this packet's raw values
+ state.raw_values.each do |key, value|
+ if value
+ vals_key = "#{key}__VALS"
+ reduced[vals_key] ||= []
+ reduced[vals_key] << value
+ n_key = "#{key}__N"
+ reduced[n_key] ||= value
+ reduced[n_key] = value if value < reduced[n_key]
+ x_key = "#{key}__X"
+ reduced[x_key] ||= value
+ reduced[x_key] = value if value > reduced[x_key]
+ end
+ end
+
+ # Update statistics for this packet's converted values
+ state.converted_values.each do |key, value|
+ if value
+ cvals_key = "#{key}__CVALS"
+ reduced[cvals_key] ||= []
+ reduced[cvals_key] << value
+ cn_key = "#{key}__CN"
+ reduced[cn_key] ||= value
+ reduced[cn_key] = value if value < reduced[cn_key]
+ cx_key = "#{key}__CX"
+ reduced[cx_key] ||= value
+ reduced[cx_key] = value if value > reduced[cx_key]
+ end
+ end
+ end
+ end
+
+ def update_raw_hour_day_stats(reduced, state)
+ # Update statistics for this packet's raw values
+ state.raw_max_values.each do |key, value|
+ if value
+ max_key = "#{key}__X"
+ reduced[max_key] ||= value
+ reduced[max_key] = value if value > reduced[max_key]
+ end
+ end
+ state.raw_min_values.each do |key, value|
+ if value
+ min_key = "#{key}__N"
+ reduced[min_key] ||= value
+ reduced[min_key] = value if value < reduced[min_key]
+ end
+ end
+ state.raw_avg_values.each do |key, value|
+ if value
+ avg_values_key = "#{key}__AVGVALS"
+ reduced[avg_values_key] ||= []
+ reduced[avg_values_key] << value
+ end
+ end
+ state.raw_stddev_values.each do |key, value|
+ if value
+ stddev_values_key = "#{key}__STDDEVVALS"
+ reduced[stddev_values_key] ||= []
+ reduced[stddev_values_key] << value
+ end
+ end
+ end
+
+ def update_converted_hour_day_stats(packet, reduced, state)
+ # Update statistics for this packet's converted values
+ state.converted_max_values.each do |key, value|
+ if value
+ max_key = "#{key}__CX"
+ reduced[max_key] ||= value
+ reduced[max_key] = value if value > reduced[max_key]
+ end
+ end
+ state.converted_min_values.each do |key, value|
+ if value
+ min_key = "#{key}__CN"
+ reduced[min_key] ||= value
+ reduced[min_key] = value if value < reduced[min_key]
+ end
+ end
+ state.converted_avg_values.each do |key, value|
+ if value
+ avg_values_key = "#{key}__CAVGVALS"
+ reduced[avg_values_key] ||= []
+ reduced[avg_values_key] << value
+ end
+ end
+ state.converted_stddev_values.each do |key, value|
+ if value
+ stddev_values_key = "#{key}__CSTDDEVVALS"
+ reduced[stddev_values_key] ||= []
+ reduced[stddev_values_key] << value
+ end
+ end
+
+ reduced["_NUM_SAMPLES__VALS"] ||= []
+ reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES')
+ end
+
def check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds)
plw_first_time_nsec = plw.buffered_first_time_nsec
if plw_first_time_nsec && ((current_time - plw_first_time_nsec) >= file_nanoseconds)
# Write out all entries in progress
write_all_entries(reducer_state, plw, type, target_name, stored)
@@ -406,13 +444,14 @@
state.current_time = current_time
state.entry_time ||= state.current_time # Sets the entry time from the first packet
return state
end
- def write_all_entries(reducer_state, plw, type, target_name, stored)
+ def write_all_entries(reducer_state, plw, type, target_name, stored, throttle = nil)
reducer_state.each do |packet_name, state|
write_entry(state, plw, type, target_name, packet_name, stored)
+ throttle.throttle_sleep if throttle
end
end
def write_entry(state, plw, type, target_name, packet_name, stored)
return unless state.reduced.length > 0
@@ -482,29 +521,34 @@
def reduce_running(key, reduced, samples, samples_sum, avg_key, stddev_key, avgvals_key, stddevvals_key)
# Calculate Average
weighted_sum = 0
avg = reduced["#{key}#{avgvals_key}"]
- avg.each_with_index do |val, i|
- weighted_sum += (val * samples[i])
+ if avg
+ avg.each_with_index do |val, i|
+ weighted_sum += (val * samples[i])
+ end
+ reduced["#{key}#{avg_key}"] = weighted_sum / samples_sum
end
- reduced["#{key}#{avg_key}"] = weighted_sum / samples_sum
# Do the STDDEV calc last so we can use the previously calculated AVG
# See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point
s2 = 0
- reduced["#{key}#{stddevvals_key}"].each_with_index do |val, i|
- # puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}"
- s2 += (samples[i] * avg[i]**2 + val**2)
- end
+ stddev = reduced["#{key}#{stddevvals_key}"]
+ if stddev
+ stddev.each_with_index do |val, i|
+ # puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}"
+ s2 += (samples[i] * avg[i]**2 + val**2)
+ end
- # Note: For very large numbers with very small deviations this sqrt can fail.
- # If so then just set the stddev to 0.
- begin
- reduced["#{key}#{stddev_key}"] =
- Math.sqrt(s2 / samples_sum - reduced["#{key}#{avg_key}"])
- rescue Exception
- reduced["#{key}#{stddev_key}"] = 0.0
+ # Note: For very large numbers with very small deviations this sqrt can fail.
+ # If so then just set the stddev to 0.
+ begin
+ reduced["#{key}#{stddev_key}"] =
+ Math.sqrt(s2 / samples_sum - reduced["#{key}#{avg_key}"])
+ rescue Exception
+ reduced["#{key}#{stddev_key}"] = 0.0
+ end
end
reduced.delete("#{key}#{avgvals_key}")
reduced.delete("#{key}#{stddevvals_key}")
end