require 'prometheus/client/helper/json_parser' require 'fast_mmaped_file' module Prometheus module Client module Helper module EntryParser class ParsingError < RuntimeError; end MINIMUM_SIZE = 8 START_POSITION = 8 def used slice(0..3).unpack('l')[0] end def parts @parts ||= File.basename(filepath, '.db') .split('_') .map { |e| e.gsub(/-\d+$/, '') } # remove trailing -number end def type parts[0].to_sym end def pid parts[2..-1].join('_') end def multiprocess_mode parts[1] end def empty? size < MINIMUM_SIZE || used.zero? end def entries return Enumerator.new {} if empty? Enumerator.new do |yielder| used_ = used # cache used to avoid unnecessary unpack operations pos = START_POSITION # used + padding offset while pos < used_ data = slice(pos..-1) encoded_len, first_encoded_bytes = data.unpack('ll') if encoded_len.zero? || first_encoded_bytes.zero? # do not parse empty data pos += 8 next end padding_len = 8 - (encoded_len + 4) % 8 value_offset = 4 + encoded_len + padding_len pos += value_offset yielder.yield data, encoded_len, value_offset, pos pos += 8 end end end def parsed_entries(ignore_errors = false) result = entries.map do |data, encoded_len, value_offset, _| begin encoded, value = data.unpack(format('@4A%d@%dd', encoded_len, value_offset)) [encoded, value] rescue ArgumentError => e Prometheus::Client.logger.debug("Error processing data: #{bin_to_hex(data[0, 7])} len: #{encoded_len} value_offset: #{value_offset}") raise ParsingError(e) unless ignore_errors end end result.reject { |e| e.nil? } if ignore_errors result end def to_metrics(metrics = {}, ignore_errors = false) FastMmapedFile.fast_entries(data) do |metric_name, name, labels, value| begin metric = metrics.fetch(metric_name, metric_name: metric_name, help: 'Multiprocess metric', type: type, samples: {} ) if type == :gauge metric[:multiprocess_mode] = multiprocess_mode merge_samples(name, labels, value, metric[:samples]) else merge_samples(name, labels, value, metric[:samples]) # The duplicates and labels are fixed in the next for. end metrics[metric_name] = metric rescue JSON::ParserError => e raise ParsingError(e) unless ignore_errors end end metrics.reject { |e| e.nil? } if ignore_errors metrics end def min(a, b) if a < b a else b end end def max(a, b) if a > b a else b end end def merge_samples(name, labels, value, samples) samples ||= {} case type when :gauge case multiprocess_mode when 'min' key = labels.merge!(__key_name: name) s = samples.fetch(key, value) samples[key] = min(s, value) when 'max' key = labels.merge!(__key_name: name) s = samples.fetch(key, value) samples[key] = max(s, value) when 'livesum' key = labels.merge!(__key_name: name) s = samples.fetch(key, 0.0) samples[key] = s + value else # all/liveall key = labels.merge!(pid: pid, __key_name: name) samples[key] = value end else key = labels.merge!(__key_name: name) # Counter, Histogram and Summary. s = samples.fetch(key, 0.0) samples[key] = s + value end samples end private def bin_to_hex(s) s.each_byte.map { |b| b.to_s(16) }.join end end end end end