# frozen_string_literal: true require "rufus/scheduler" require "net/http" require "json" require_relative "../sampler/rules_sampler" require_relative "../sampler/message_process_sampler" module Aspecto # Aspecto's OpenTelemetry distribution module OpenTelemetry module Config # Handle fetching of remote configuration from aspecto class RemoteConfig def initialize(aspecto_auth, service_name, env, fallback_sampler) @service_name = service_name @env = env @error_reported = false @got_remote_config = false @fallback_sampler = fallback_sampler aspecto_config_host = ENV.fetch("ASPECTO_CONFIG_HOST", "https://config.aspecto.io") @aspecto_config_url = URI("#{aspecto_config_host}/config/#{aspecto_auth}") init_http_client @scheduler = Rufus::Scheduler.new @remote_config_poll_frequency = ENV.fetch("ASPECTO_REMOTE_CONFIG_POLL_FREQUENCY", "30s") @scheduler.interval @remote_config_poll_frequency, first: :now do update_config end ::OpenTelemetry.logger.info "[Aspecto] initialized remote config polling" rescue StandardError => e ::OpenTelemetry.logger.error "[Aspecto] failed to initialize remote config polling" ::OpenTelemetry.logger.error e end def shutdown @scheduler.shutdown end private def init_http_client write_timeout_supported = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new("2.6") @http_client = Net::HTTP.new(@aspecto_config_url.host, @aspecto_config_url.port) @http_client.read_timeout = 10 @http_client.open_timeout = 10 @http_client.write_timeout = 10 if write_timeout_supported @http_client.max_retries = 0 # use uri.scheme == 'https' instead @http_client.use_ssl = true @http_client.verify_mode = OpenSSL::SSL::VERIFY_PEER end def update_config ::OpenTelemetry::Common::Utilities.untraced do request = Net::HTTP::Get.new(@aspecto_config_url.path) request["If-None-Match"] = @latest_config_etag unless @latest_config_etag.nil? response = @http_client.request(request) response_code = response.code.to_i return if response_code == 304 if response_code >= 400 log_config_error "failed to get remote config with http response #{response_code}." return end @latest_config_etag = response["etag"] handle_new_config JSON.parse(response.body) if response_code < 300 end rescue StandardError => e log_config_error "updating remote config failed with exception.", e end def handle_new_config(config) update_sampler config["samplingRules"] @error_reported = false @got_remote_config = true ::OpenTelemetry.logger.info("[Aspecto] successfully updated remote configuration") end def update_sampler(sampler_config) rules_sampler = Sampler::RulesSampler.new sampler_config, @fallback_sampler, @service_name, @env service_sampler = ::OpenTelemetry::SDK::Trace::Samplers.parent_based(root: rules_sampler) message_process_sampler = Sampler::MessageProcessSampler.new rules_sampler, service_sampler # updating the sampler should be thread safe as it's an atomic setter on a global value ::OpenTelemetry.tracer_provider.sampler = message_process_sampler end def log_config_error(msg, err = nil) return if @error_reported # report every issue only once retry_msg = "will try again until success every #{@remote_config_poll_frequency}." previous_config_msg = @got_remote_config ? "using previous remote config" : "no previous config is avialable" ::OpenTelemetry.logger.info("[Aspecto]: #{msg} #{retry_msg} #{previous_config_msg}") ::OpenTelemetry.logger.info(err) if err @error_reported = true end end end end end