lib/openc3/microservices/reducer_microservice.rb in openc3-5.1.1 vs lib/openc3/microservices/reducer_microservice.rb in openc3-5.2.0
- old
+ new
@@ -95,11 +95,11 @@
@config['options'].each do |option|
case option[0].upcase
when 'BUFFER_DEPTH' # Buffer depth to write in time order
@buffer_depth = option[1].to_i
else
- Logger.error("Unknown option passed to microservice #{@name}: #{option}")
+ @logger.error("Unknown option passed to microservice #{@name}: #{option}")
end
end
end
@buffer_depth = 10 unless @buffer_depth
@@ -127,11 +127,11 @@
# block until shutdown is called
@scheduler.join
end
def shutdown
- Logger.info("Shutting down reducer microservice: #{@name}")
+ @logger.info("Shutting down reducer microservice: #{@name}")
@scheduler.shutdown(wait: SHUTDOWN_DELAY_SECS) if @scheduler
# Make sure all the existing logs are properly closed down
threads = []
@packet_logs.each do |name, log|
@@ -191,10 +191,14 @@
end
def process_file(filename, type, entry_nanoseconds, file_nanoseconds)
file = BucketFile.new(filename)
file.retrieve
+ unless file.local_path
+ @logger.warn("Reducer Warning: #{filename}: Could not be retrieved")
+ return
+ end
# Determine if we already have a PacketLogWriter created
_, _, scope, target_name, _, rt_or_stored, _ = File.basename(filename).split('__')
stored = (rt_or_stored == "stored")
@@ -269,70 +273,90 @@
reduced = state.reduced
if type == 'minute'
# Update statistics for this packet's raw values
state.raw_values.each do |key, value|
- reduced["#{key}__VALS"] ||= []
- reduced["#{key}__VALS"] << value
- reduced["#{key}__N"] ||= value
- reduced["#{key}__N"] = value if value < reduced["#{key}__N"]
- reduced["#{key}__X"] ||= value
- reduced["#{key}__X"] = value if value > reduced["#{key}__X"]
+ if value
+ reduced["#{key}__VALS"] ||= []
+ reduced["#{key}__VALS"] << value
+ reduced["#{key}__N"] ||= value
+ reduced["#{key}__N"] = value if value < reduced["#{key}__N"]
+ reduced["#{key}__X"] ||= value
+ reduced["#{key}__X"] = value if value > reduced["#{key}__X"]
+ end
end
# Update statistics for this packet's converted values
state.converted_values.each do |key, value|
- reduced["#{key}__CVALS"] ||= []
- reduced["#{key}__CVALS"] << value
- reduced["#{key}__CN"] ||= value
- reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"]
- reduced["#{key}__CX"] ||= value
- reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"]
+ if value
+ reduced["#{key}__CVALS"] ||= []
+ reduced["#{key}__CVALS"] << value
+ reduced["#{key}__CN"] ||= value
+ reduced["#{key}__CN"] = value if value < reduced["#{key}__CN"]
+ reduced["#{key}__CX"] ||= value
+ reduced["#{key}__CX"] = value if value > reduced["#{key}__CX"]
+ end
end
else
# Update statistics for this packet's raw values
state.raw_max_values.each do |key, value|
- max_key = "#{key}__X"
- reduced[max_key] ||= value
- reduced[max_key] = value if value > reduced[max_key]
+ if value
+ max_key = "#{key}__X"
+ reduced[max_key] ||= value
+ reduced[max_key] = value if value > reduced[max_key]
+ end
end
state.raw_min_values.each do |key, value|
- min_key = "#{key}__N"
- reduced[min_key] ||= value
- reduced[min_key] = value if value < reduced[min_key]
+ if value
+ min_key = "#{key}__N"
+ reduced[min_key] ||= value
+ reduced[min_key] = value if value < reduced[min_key]
+ end
end
state.raw_avg_values.each do |key, value|
- avg_values_key = "#{key}__AVGVALS"
- reduced[avg_values_key] ||= []
- reduced[avg_values_key] << value
+ if value
+ avg_values_key = "#{key}__AVGVALS"
+ reduced[avg_values_key] ||= []
+ reduced[avg_values_key] << value
+ end
end
state.raw_stddev_values.each do |key, value|
- stddev_values_key = "#{key}__STDDEVVALS"
- reduced[stddev_values_key] ||= []
- reduced[stddev_values_key] << value
+ if value
+ stddev_values_key = "#{key}__STDDEVVALS"
+ reduced[stddev_values_key] ||= []
+ reduced[stddev_values_key] << value
+ end
end
# Update statistics for this packet's converted values
state.converted_max_values.each do |key, value|
- max_key = "#{key}__CX"
- reduced[max_key] ||= value
- reduced[max_key] = value if value > reduced[max_key]
+ if value
+ max_key = "#{key}__CX"
+ reduced[max_key] ||= value
+ reduced[max_key] = value if value > reduced[max_key]
+ end
end
state.converted_min_values.each do |key, value|
- min_key = "#{key}__CN"
- reduced[min_key] ||= value
- reduced[min_key] = value if value < reduced[min_key]
+ if value
+ min_key = "#{key}__CN"
+ reduced[min_key] ||= value
+ reduced[min_key] = value if value < reduced[min_key]
+ end
end
state.converted_avg_values.each do |key, value|
- avg_values_key = "#{key}__CAVGVALS"
- reduced[avg_values_key] ||= []
- reduced[avg_values_key] << value
+ if value
+ avg_values_key = "#{key}__CAVGVALS"
+ reduced[avg_values_key] ||= []
+ reduced[avg_values_key] << value
+ end
end
state.converted_stddev_values.each do |key, value|
- stddev_values_key = "#{key}__CSTDDEVVALS"
- reduced[stddev_values_key] ||= []
- reduced[stddev_values_key] << value
+ if value
+ stddev_values_key = "#{key}__CSTDDEVVALS"
+ reduced[stddev_values_key] ||= []
+ reduced[stddev_values_key] << value
+ end
end
reduced["_NUM_SAMPLES__VALS"] ||= []
reduced["_NUM_SAMPLES__VALS"] << packet.read('_NUM_SAMPLES')
end
@@ -343,12 +367,12 @@
write_all_entries(reducer_state, plw, type, target_name, stored)
true
rescue => e
if file.local_path and File.exist?(file.local_path)
- Logger.error("Reducer Error: #{filename}: #{File.size(file.local_path)} bytes: \n#{e.formatted}")
+ @logger.error("Reducer Error: #{filename}: #{File.size(file.local_path)} bytes: \n#{e.formatted}")
else
- Logger.error("Reducer Error: #{filename}: \n#{e.formatted}")
+ @logger.error("Reducer Error: #{filename}: \n#{e.formatted}")
end
false
end
def check_new_file(reducer_state, plw, type, target_name, stored, current_time, file_nanoseconds)