# typed: ignore # Copyright (c) 2015 Sqreen. All Rights Reserved. # Please refer to our terms for more information: https://www.sqreen.com/terms.html require 'ipaddr' require 'timeout' require 'json' require 'sqreen/events/attack' require 'sqreen/log' require 'sqreen/agent_message' require 'sqreen/rules' require 'sqreen/session' require 'sqreen/remote_command' require 'sqreen/capped_queue' require 'sqreen/metrics_store' require 'sqreen/deliveries/simple' require 'sqreen/deliveries/batch' require 'sqreen/endpoint_testing' require 'sqreen/performance_notifications/metrics' require 'sqreen/performance_notifications/binned_metrics' require 'sqreen/legacy/instrumentation' require 'sqreen/call_countable' require 'sqreen/weave/legacy/instrumentation' require 'sqreen/kit/configuration' module Sqreen @features = {} @queue = nil # Event Queue that enable communication between threads and the reporter MAX_QUEUE_LENGTH = 100 MAX_OBS_QUEUE_LENGTH = 1000 METRICS_EVENT = 'metrics'.freeze PERF_METRICS_PERIOD = 60 # 1 min DEFAULT_PERF_LEVEL = 0 # disabled DEFAULT_USE_SIGNALS = false class << self attr_reader :features def update_features(features) @features = features end def queue @queue ||= CappedQueue.new(MAX_QUEUE_LENGTH) end def update_queue(queue) @queue = queue end def observations_queue @observations_queue ||= CappedQueue.new(MAX_OBS_QUEUE_LENGTH) end attr_accessor :instrumentation_ready alias instrumentation_ready? instrumentation_ready attr_accessor :logged_in alias logged_in? logged_in attr_reader :whitelisted_paths def update_whitelisted_paths(paths) @whitelisted_paths = paths.freeze end attr_reader :whitelisted_ips def update_whitelisted_ips(paths) @whitelisted_ips = Hash[paths.map { |v| [v, IPAddr.new(v)] }].freeze end attr_reader :performance_budget def update_performance_budget(value) return @performance_budget = nil if value.nil? @performance_budget = value.to_f / 1000 end end # Main running job class for the agent class Runner # During one hour HEARTBEAT_WARMUP = 60 * 60 # Initail delay is 5 minutes HEARTBEAT_MAX_DELAY = 5 * 60 attr_accessor :heartbeat_delay attr_accessor :metrics_engine # @return [Sqreen::Deliveries::Simple] attr_reader :deliverer # @return [Sqreen::Session] attr_reader :session attr_reader :instrumenter attr_accessor :running attr_accessor :next_command_results attr_accessor :next_metrics # we may want to do that in a thread in order to prevent delaying app # startup # set_at_exit do not place a global at_exit (used for testing) def initialize(configuration, framework, set_at_exit = true, session_class = Sqreen::Session) Sqreen.update_queue(CappedQueue.new(MAX_QUEUE_LENGTH)) @logged_out_tried = false @configuration = configuration @framework = framework @heartbeat_delay = HEARTBEAT_MAX_DELAY @last_heartbeat_request = Time.now @next_command_results = {} @next_metrics = [] @running = true @proxy_url = @configuration.get(:proxy_url) chosen_endpoints = determine_endpoints @token = @configuration.get(:token) @app_name = @configuration.get(:app_name) @url = chosen_endpoints.control.url @cert_store = chosen_endpoints.control.ca_store Sqreen.update_whitelisted_paths([]) Sqreen.update_whitelisted_ips({}) Sqreen.update_performance_budget(nil) raise(Sqreen::TokenNotFoundException, 'no token found') unless @token Sqreen::Kit::Configuration.logger = Sqreen.log Sqreen::Kit::Configuration.ingestion_url = chosen_endpoints.ingestion.url Sqreen::Kit::Configuration.certificate_store = chosen_endpoints.ingestion.ca_store Sqreen::Kit::Configuration.proxy_url = @proxy_url register_exit_cb if set_at_exit self.metrics_engine = MetricsStore.new needs_weave = proc do Gem::Specification.select { |s| s.name == 'scout_apm' && Gem::Requirement.new('>= 2.5.2').satisfied_by?(Gem::Version.new(s.version)) }.any? end if @configuration.get(:weave) || needs_weave.call @instrumenter = Sqreen::Weave::Legacy::Instrumentation.new(metrics_engine) else @instrumenter = Sqreen::Legacy::Instrumentation.new(metrics_engine) end Sqreen.log.debug "Using token #{@token}" response = create_session(session_class) post_endpoint_testing_msgs(chosen_endpoints) wanted_features = response.fetch('features', {}) conf_initial_features = configuration.get(:initial_features) unless conf_initial_features.nil? begin conf_features = JSON.parse(conf_initial_features) raise 'Invalid Type' unless conf_features.is_a?(Hash) Sqreen.log.debug do "Override initial features with #{conf_features.inspect}" end wanted_features = wanted_features.merge(conf_features) rescue Sqreen.log.warn do "NOT using invalid initial features #{conf_initial_features}" end end end self.features = wanted_features # Ensure a deliverer is there unless features have set it first self.deliverer ||= Deliveries::Simple.new(session) context_infos = {} %w[rules pack_id].each do |p| context_infos[p] = response[p] unless response[p].nil? end process_commands(response.fetch('commands', []), context_infos) end def create_session(session_class) @session = session_class.new(@url, @cert_store, @token, @app_name, @proxy_url) session.login(@framework) end def deliverer=(new_deliverer) deliverer.drain if deliverer @deliverer = new_deliverer end def batch_events(batch_size, max_staleness = nil, use_signals = false) size = batch_size.to_i if size <= 1 && use_signals Sqreen.log.warn do "Using signals with no delivery batching is unsupported. " \ "Using instead batching with batch size = 30, max_staleness = 60" end size = 30 max_staleness = 60 end self.deliverer = if size < 1 Deliveries::Simple.new(session) else staleness = max_staleness.to_i Deliveries::Batch.new(session, size, staleness) end end def load_rules(context_infos = {}) rules_pack = context_infos['rules'] rulespack_id = context_infos['pack_id'] if rules_pack.nil? || rulespack_id.nil? session_rules = session.rules rules_pack = session_rules['rules'] rulespack_id = session_rules['pack_id'] end rules = rules_pack.each { |r| r['rulespack_id'] = rulespack_id } Sqreen.log.info { format('retrieved rulespack id: %s', rulespack_id) } Sqreen.log.debug { format('retrieved %d rules', rules.size) } local_rules = Sqreen::Rules.local(@configuration) || [] rules += local_rules. select { |rule| rule['enabled'] }. each { |r| r['rulespack_id'] = 'local' } Sqreen.log.debug do format('rules: %s', rules. sort_by { |r| r['name'] }. map { |r| format('(%s, %s, %s)', r[Rules::Attrs::NAME], r.to_json.size, r[Rules::Attrs::BLOCK]) }. join(', ')) end [rulespack_id, rules] end def call_counts_metrics_period=(value) value = value.to_i return unless value > 0 # else disable collection? metrics_engine.create_metric('name' => CallCountable::COUNT_CALLS, 'period' => value, 'kind' => 'Sum') end def performance_metrics_period=(value) value = value.to_i if value > 0 PerformanceNotifications::Metrics.enable(metrics_engine, value) else PerformanceNotifications::Metrics.disable end end def config_binned_metrics(level, base, factor, base_pct, factor_pct) level = level.to_i if level <= 0 Sqreen.log.info('Disabling binned metrics') PerformanceNotifications::BinnedMetrics.disable else Sqreen.log.info('Enabling binned metrics') Sqreen.log.warn("Unknown value for perf_level: #{level}. Treating as 1") unless level == 1 PerformanceNotifications::BinnedMetrics.enable( metrics_engine, PERF_METRICS_PERIOD, base.to_f, factor.to_f, base_pct.to_f, factor_pct.to_f ) end end def setup_instrumentation(context_infos = {}) Sqreen.log.info 'Setting up instrumentation' rulespack_id, rules = load_rules(context_infos) @framework.instrument_when_ready!(instrumenter, rules) Sqreen.log.info 'Instrumentation set up' rulespack_id.to_s end def remove_instrumentation(_context_infos = {}) Sqreen.log.debug 'Removing instrumentation' instrumenter.remove_all_callbacks Sqreen::Actions::Repository.clear Sqreen.log.debug 'Instrumentation removed' true end def reload_rules(_context_infos = {}) Sqreen.log.debug 'Reloading rules' rulespack_id, rules = load_rules instrumenter.remove_all_callbacks @framework.instrument_when_ready!(instrumenter, rules) Sqreen.log.debug 'Rules reloaded' rulespack_id.to_s end def reload_actions(_context_infos = {}) Sqreen.log.debug 'Reloading actions' data = session.get_actionspack unless data.respond_to?(:[]) && data['status'] Sqreen.log.warn('Could not load actions') return RemoteCommand::FailureOutput.new( :error => 'Could not load actions from /actionspack' ) end action_hashes = data['actions'] unless action_hashes.respond_to? :each Sqreen.log.warn('No action definitions in response') return RemoteCommand::FailureOutput.new( :error => 'No action definitions in response' ) end Sqreen.log.debug("Loading actions from hashes #{action_hashes}") unsupported = load_actions(action_hashes) if unsupported.empty? true else RemoteCommand::FailureOutput.new(:unsupported_actions => unsupported.to_a) end end def process_commands(commands, context_infos = {}) return if commands.nil? || commands.empty? res = RemoteCommand.process_list(self, commands, context_infos) @next_command_results = res end def do_heartbeat @last_heartbeat_request = Time.now @next_metrics.concat(metrics_engine.publish(false)) if metrics_engine metrics_in_hb = use_signals? ? nil : next_metrics res = session.heartbeat(next_command_results, metrics_in_hb) next_command_results.clear deliver_metrics_as_event if use_signals? next_metrics.clear process_commands(res['commands']) end def deliver_metrics_as_event # this is disastrous withe simple delivery strategy, # as each aggregated metric would trigger an http request # Sending of metrics is therefore not supported with simple delivery strategy # TODO: Confirm that only batch is used in production next_metrics.each { |x| deliverer.post_event(x) } end def features(_context_infos = {}) Sqreen.features end def use_signals? features.fetch('use_signals', DEFAULT_USE_SIGNALS) end def features=(features) Sqreen.update_features(features) session.request_compression = features['request_compression'] if session session.use_signals = use_signals? self.performance_metrics_period = features['performance_metrics_period'] unless @configuration.get(:weave) config_binned_metrics(features['perf_level'] || DEFAULT_PERF_LEVEL, features['perf_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_BASE, features['perf_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_UNIT, features['perf_pct_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_BASE, features['perf_pct_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_UNIT, ) end self.call_counts_metrics_period = features['call_counts_metrics_period'] hd = features['heartbeat_delay'].to_i self.heartbeat_delay = hd if hd > 0 return if features['batch_size'].nil? batch_events(features['batch_size'], features['max_staleness'], use_signals?) end def change_whitelisted_paths(paths, _context_infos = {}) return false unless paths.respond_to?(:each) Sqreen.update_whitelisted_paths(paths) true end def change_performance_budget(budget, _context_infos = {}) return false unless budget.nil? || budget.to_f > 0 if @configuration.get(:weave) prev = Sqreen::Weave::Budget.current prev = prev.to_h if prev budget_s = budget.to_f / 1000 if budget feature = features['performance_budget'] if feature budget_s = feature['threshold'] if feature.key?('threshold') ratio = feature['ratio'] if feature.key?('ratio') end Sqreen::Weave::Budget.update(threshold: budget_s, ratio: ratio) else prev = Sqreen.performance_budget Sqreen.update_performance_budget(budget) end { :was => prev } end def upload_bundle(_context_infos = {}) t = Time.now session.post_bundle(RuntimeInfos.dependencies_signature, RuntimeInfos.dependencies) Time.now - t end def change_whitelisted_ips(ips, _context_infos = {}) return false unless ips.respond_to?(:each) Sqreen.update_whitelisted_ips(ips) true end def change_features(new_features, _context_infos = {}) old = features self.features = new_features { 'was' => old, 'now' => new_features, } end def aggregate_observations q = Sqreen.observations_queue conv = Sqreen.time - Time.now.utc.to_f q.size.times do cat, key, obs, t = q.pop metrics_engine.update(cat, conv + t.utc.to_f, key, obs) end end def heartbeat_needed? (@last_heartbeat_request + heartbeat_delay) < Time.now end def run_watcher_once event = Timeout.timeout(heartbeat_delay) do Sqreen.queue.pop end rescue Timeout::Error periodic_cleanup else handle_event(event) if heartbeat_needed? # Also aggregate/post metrics when cleanup has # not been done for a long time Sqreen.log.debug 'Forced an heartbeat' periodic_cleanup # will trigger do_heartbeat since it's time end ensure PerformanceNotifications::BinnedMetrics.finish_watcher_run end def periodic_cleanup # Nothing occured: # tick delivery, aggregates_metrics # issue a simple heartbeat if it's time (which may return commands) @deliverer.tick aggregate_observations do_heartbeat if heartbeat_needed? end def handle_event(event) if event == METRICS_EVENT aggregate_observations else @deliverer.post_event(event) end end def run_watcher run_watcher_once while running end # Sinatra is using at_exit to run the application, see: # https://github.com/sinatra/sinatra/blob/cd503e6c590cd48c2c9bb7869522494bfc62cb14/lib/sinatra/main.rb#L25 def exit_from_sinatra_startup? defined?(Sinatra::Application) && Sinatra::Application.respond_to?(:run?) && !Sinatra::Application.run? end def shutdown(_context_infos = {}) remove_instrumentation logout end def restart(_context_infos = {}) shutdown heartbeat_delay = @heartbeat_delay Thread.new do sleep(2 * heartbeat_delay) Sqreen::Worker.start(Sqreen.framework) end end def logout(retrying = true) return unless session Sqreen.log.debug("Logging out") if @framework.development? @running = false return end if @logged_out_tried Sqreen.log.debug('Not running logout twice') return end @logged_out_tried = true @deliverer.drain if @deliverer aggregate_observations session.post_metrics(metrics_engine.publish) if metrics_engine session.logout(retrying) @running = false end def register_exit_cb(try_again = true) at_exit do if exit_from_sinatra_startup? && try_again register_exit_cb(false) else begin logout rescue StandardError => e Sqreen.log.debug(e.inspect) Sqreen.log.debug(e.backtrace) nil end end end end private def post_endpoint_testing_msgs(chosen_endpoints) chosen_endpoints.messages.each do |msg| session.post_agent_message(@framework, msg) end rescue => e Sqreen.log.warn "Error submitting agent message: #{e}" RemoteException.record(e) end def determine_endpoints # there's no sniffing going on; just a misnamed config setting if @configuration.get(:no_sniff_domains) # reproduces behaviour before endpoint testing was introduced EndpointTesting.no_test_endpoints(@configuration.get(:url), @configuration.get(:ingestion_url)) else EndpointTesting.test_endpoints(@proxy_url, @configuration.get(:url), @configuration.get(:ingestion_url)) end end def load_actions(hashes) unsupported = Set.new new_repos = Sqreen::Actions::Repository.new actions = hashes.map do |h| begin act = Sqreen::Actions.deserialize_action(h) new_repos.add h['parameters'], act act rescue Sqreen::Actions::UnknownActionType => e Sqreen.log.warn("Unsupported action type: #{e.action_type}") unsupported << e.action_type nil rescue => e raise Sqreen::Exception, "Invalid action hash: #{h}: #{e.message}" end end actions = actions.reject(&:nil?) Sqreen.log.debug("Added #{actions.size} valid actions") Sqreen::Actions::Repository.current = new_repos unsupported end end end