lib/pgq_prometheus/processor.rb in pgq_prometheus-0.1.0 vs lib/pgq_prometheus/processor.rb in pgq_prometheus-0.2.0

- old
+ new

@@ -1,71 +1,80 @@ +# frozen_string_literal: true + require 'prometheus_exporter/client' module PgqPrometheus class Processor class << self attr_accessor :sql_caller, :logger, :on_error - end - def self.start(client: nil, frequency: 30, labels: nil) - raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil? + def start(client: nil, frequency: 30, labels: nil) + raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil? - client ||= PrometheusExporter::Client.default - metric_labels = labels&.dup || {} - process_collector = new(metric_labels) + stop - stop if running? + client ||= PrometheusExporter::Client.default + metric_labels = labels&.dup || {} + process_collector = new(metric_labels) - @thread = Thread.new do - while true - begin - metrics = process_collector.collect - metrics.each do |metric| - client.send_json metric + @thread = Thread.new do + wrap_thread_loop(name) do + sql_caller.release_connection + logger&.info { "Start #{name}" } + while true + begin + metrics = process_collector.collect + metrics.each do |metric| + client.send_json metric + end + rescue => e + STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}" + log(:error) { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" } + on_error&.call(e) + end + sleep frequency end - rescue => e - STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}" - logger&.error { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" } - on_error&.call(e) end - sleep frequency end + + true end - true - end + def stop + @thread&.kill + @thread = nil + end - def self.stop - return unless running? + def running? + defined?(@thread) && @thread + end - @thread.kill - @thread = nil - end + def wrap_thread_loop(*tags) + return yield if logger.nil? || !logger.respond_to?(:tagged) - def self.running? - defined?(@thread) && @thread + logger.tagged(*tags) { yield } + end end def initialize(labels = {}) @metric_labels = labels || {} end def collect metrics = [] + sql_caller.with_connection do - within_logger_tags(self.class.name) do - - self.class.sql_caller.queue_info.each do |queue_info| + sql_caller.queue_info.each do |queue_info| queue = queue_info[:queue_name] queue_metric_opts.each do |name, opts| value = opts[:apply].call(queue_info) labels = opts[:labels].merge(queue: queue) metrics << format_metric(name, value, labels) end - self.class.sql_caller.consumer_info(queue).each do |consumer_info| + sql_caller.consumer_info(queue).each do |consumer_info| consumer = consumer_info[:consumer_name] consumer_metric_opts.each do |name, opts| value = opts[:apply].call(consumer_info, queue_info) labels = opts[:labels].merge(queue: queue, consumer: consumer) @@ -84,19 +93,17 @@ metrics end private - def logger - self.class.logger + [:sql_caller, :logger].each do |meth| + define_method(meth) { |*args, &block| self.class.public_send(meth, *args, &block) } end - def within_logger_tags(*tags) - if logger.nil? || !logger.respond_to?(:tagged) - yield - else - logger.tagged(*tags) { yield } - end + def log(severity) + return yield if logger.nil? + + logger.public_send(severity) { yield } end def queue_metric_opts Config._metrics.select { |_, opts| opts[:from] == :queue } end