# frozen_string_literal: true module Tobox module Plugins module Stats module ConfigurationMethods attr_reader :stats_interval_seconds def on_stats(stats_interval_seconds, &callback) @stats_interval_seconds = stats_interval_seconds (@lifecycle_events[:stats] ||= []) << callback self end end class StatsEmitter def initialize(config) @config = config @running = false end def start return if @running config = @config plugins = config.plugins.map(&:name) interval = config.stats_interval_seconds @stats_handlers = Array(config.lifecycle_events[:stats]) return if @stats_handlers.empty? @error_handlers = Array(config.lifecycle_events[:error_worker]) @max_attempts = config[:max_attempts] @created_at_column = config[:created_at_column] @db = Sequel.connect(config.database.opts.merge(max_connections: 1)) @db.loggers = config.database.loggers Array(config.lifecycle_events[:database_connect]).each { |cb| cb.call(@db) } outbox_table = config[:table] @outbox_ds = @db[outbox_table] if plugins.include?("Tobox::Plugins::Inbox") inbox_table = config[:inbox_table] @inbox_ds = @db[inbox_table] end if @created_at_column # discard already handled events # @oldest_event_age_ds = @outbox_ds.where(last_error: nil) @oldest_event_age_ds = if config.visibility_type_bool? @oldest_event_age_ds.where(config[:visibility_column] => false) else @oldest_event_age_ds.where(config[:visibility_column] => nil) end @oldest_event_age_ds = @oldest_event_age_ds.order(Sequel.asc(:id)) end logger = config.default_logger stats = method(:collect_event_stats) stats.instance_eval do alias collect call end @th = Thread.start do Thread.current.name = "outbox-stats" loop do logger.debug { "stats worker: sleep for #{interval}s..." } sleep interval begin emit_event_stats(stats) rescue RuntimeError => e @error_handlers.each { |hd| hd.call(e) } end break unless @running end end @running = true end def stop return unless @running @th.terminate @db.disconnect @running = false end private def emit_event_stats(stats) @stats_handlers.each do |hd| hd.call(stats, @db) end end def collect_event_stats stats = @outbox_ds.group_and_count( Sequel.case([ [{ last_error: nil }, "pending_count"], [Sequel.expr(@config[:attempts_column]) < @max_attempts, "failing_count"] ], "failed_count").as(:status) ) stats = stats.as_hash(:status, :count).transform_keys(&:to_sym) # fill it in stats[:pending_count] ||= 0 stats[:failing_count] ||= 0 stats[:failed_count] ||= 0 stats[:inbox_count] = @inbox_ds.count if @inbox_ds if @oldest_event_age_ds created_at = @oldest_event_age_ds.get(@created_at_column) age = created_at ? (Time.now - created_at).to_i : 0 stats[:oldest_event_age_in_seconds] = age end stats end end class << self def configure(config) emitter = StatsEmitter.new(config) config.on_start(&emitter.method(:start)) config.on_stop(&emitter.method(:stop)) end end end register_plugin :stats, Stats end end