lib/pgq_prometheus/processor.rb in pgq_prometheus-0.1.0 vs lib/pgq_prometheus/processor.rb in pgq_prometheus-0.2.0
- old
+ new
@@ -1,71 +1,80 @@
+# frozen_string_literal: true
+
require 'prometheus_exporter/client'
module PgqPrometheus
class Processor
class << self
attr_accessor :sql_caller, :logger, :on_error
- end
- def self.start(client: nil, frequency: 30, labels: nil)
- raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil?
+ def start(client: nil, frequency: 30, labels: nil)
+ raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil?
- client ||= PrometheusExporter::Client.default
- metric_labels = labels&.dup || {}
- process_collector = new(metric_labels)
+ stop
- stop if running?
+ client ||= PrometheusExporter::Client.default
+ metric_labels = labels&.dup || {}
+ process_collector = new(metric_labels)
- @thread = Thread.new do
- while true
- begin
- metrics = process_collector.collect
- metrics.each do |metric|
- client.send_json metric
+ @thread = Thread.new do
+ wrap_thread_loop(name) do
+ sql_caller.release_connection
+ logger&.info { "Start #{name}" }
+ while true
+ begin
+ metrics = process_collector.collect
+ metrics.each do |metric|
+ client.send_json metric
+ end
+ rescue => e
+ STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}"
+ log(:error) { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" }
+ on_error&.call(e)
+ end
+ sleep frequency
end
- rescue => e
- STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}"
- logger&.error { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" }
- on_error&.call(e)
end
- sleep frequency
end
+
+ true
end
- true
- end
+ def stop
+ @thread&.kill
+ @thread = nil
+ end
- def self.stop
- return unless running?
+ def running?
+ defined?(@thread) && @thread
+ end
- @thread.kill
- @thread = nil
- end
+ def wrap_thread_loop(*tags)
+ return yield if logger.nil? || !logger.respond_to?(:tagged)
- def self.running?
- defined?(@thread) && @thread
+ logger.tagged(*tags) { yield }
+ end
end
def initialize(labels = {})
@metric_labels = labels || {}
end
def collect
metrics = []
+ sql_caller.with_connection do
- within_logger_tags(self.class.name) do
-
- self.class.sql_caller.queue_info.each do |queue_info|
+ sql_caller.queue_info.each do |queue_info|
queue = queue_info[:queue_name]
queue_metric_opts.each do |name, opts|
value = opts[:apply].call(queue_info)
labels = opts[:labels].merge(queue: queue)
metrics << format_metric(name, value, labels)
end
- self.class.sql_caller.consumer_info(queue).each do |consumer_info|
+ sql_caller.consumer_info(queue).each do |consumer_info|
consumer = consumer_info[:consumer_name]
consumer_metric_opts.each do |name, opts|
value = opts[:apply].call(consumer_info, queue_info)
labels = opts[:labels].merge(queue: queue, consumer: consumer)
@@ -84,19 +93,17 @@
metrics
end
private
- def logger
- self.class.logger
+ [:sql_caller, :logger].each do |meth|
+ define_method(meth) { |*args, &block| self.class.public_send(meth, *args, &block) }
end
- def within_logger_tags(*tags)
- if logger.nil? || !logger.respond_to?(:tagged)
- yield
- else
- logger.tagged(*tags) { yield }
- end
+ def log(severity)
+ return yield if logger.nil?
+
+ logger.public_send(severity) { yield }
end
def queue_metric_opts
Config._metrics.select { |_, opts| opts[:from] == :queue }
end