require 'soar_thread_worker/thread_worker' module SoarAuditingProvider class AuditingWorker < SoarThreadWorker::ThreadWorker attr_reader :failed_audit_attempts attr_reader :latest_failed_audit_timestamp attr_reader :latest_failed_audit_error_message attr_reader :successful_audits attr_reader :dequeued_audits attr_reader :latest_successful_audit_timespan attr_reader :latest_successful_audit_timestamp def initialize @queue = Queue.new @start_mutex = Mutex.new initialize_metrics end def configure(queue_worker_configuration: ,auditor_audit_method: ) validate_configuration(queue_worker_configuration) @maximum_queue_size = queue_worker_configuration['queue_size'].to_i @initial_back_off_in_seconds = queue_worker_configuration['initial_back_off_in_seconds'].to_i @back_off_multiplier = queue_worker_configuration['back_off_multiplier'].to_i @maximum_back_off_attempts = queue_worker_configuration['back_off_attempts'].to_i @auditor_audit_method = auditor_audit_method end def enqueue(level, data) if @queue.size < @maximum_queue_size then @queue.push({:level => level, :data => data}) else raise AuditingOverflowError end ensure_worker_is_running end def start(verbose: false) @start_mutex.synchronize { if not running? then super() $stderr.puts("Auditing worker was not running and respawned") if verbose end } end def execute audit_event = @queue.pop @dequeued_audits += 1 failed_before = false begin if @stopping @queue.push(audit_event) if audit_event #push the event back into the queue so that fallback flush mechanism can deal with this audit event return true #indicates to thread worder that we are done executing since we are in the process of stopping end exponential_back_off(start_at_last_attempt: failed_before) { time_before_audit = Time.now @auditor_audit_method.call(audit_event[:level],audit_event[:data]) @latest_successful_audit_timespan = (Time.now - time_before_audit).round(3) @latest_successful_audit_timestamp = Time.now.utc.iso8601(3) @successful_audits += 1 } rescue Exception => e print_exception_with_message_to_stderr(nil,e) failed_before = true retry end return false #indicates to thread worder that we are not done executing end def flush(timeout = 1) ensure_worker_is_running wait_for_worker_to_clear_queue(timeout) fallback_flush_to_stderr if @queue.size > 0 end def status_detail { 'queue_size' => @queue.size, 'dequeued_audits' => @dequeued_audits, 'successful_audits' => @successful_audits, 'failed_audit_attempts' => @failed_audit_attempts, 'latest_successful_audit_timespan' => @latest_successful_audit_timespan, 'latest_successful_audit_timestamp' => @latest_successful_audit_timestamp, 'latest_failed_audit_timestamp' => @latest_failed_audit_timestamp, 'latest_failed_audit_error_message' => @latest_failed_audit_error_message } end private def wait_for_worker_to_clear_queue(timeout = 1) start_time = Time.now until ((@queue.size == 0) or ((Time.now - start_time) >= timeout)) do sleep(0.1) end end def fallback_flush_to_stderr(timeout = 1) $stderr.puts 'Unable to flush audit entries to auditor, stopping worker and flushing to stderr' ensure_worker_is_stopped start_time = Time.now until ((@queue.size == 0) or ((Time.now - start_time) >= timeout)) do audit_event = @queue.pop $stderr.puts audit_event[:data].to_s end rescue Exception => e print_exception_with_message_to_stderr('Failure during fallback attempt to flush audit entries to stderr',e) raise end def print_exception_with_message_to_stderr(notification,exception) message = "#{exception.class}: #{exception.message}" message = message + ":\n\t" + exception.backtrace.join("\n\t") if ENV['RACK_ENV'] == 'development' $stderr.puts "#{notification}: #{message}" end def ensure_worker_is_running start(verbose: true) end def ensure_worker_is_stopped attempt_graceful_stop sleep_while_still_running(5) force_stop end def attempt_graceful_stop @stopping = true end def force_stop stop end def validate_configuration(queue_worker_configuration) raise ArgumentError.new("Invalid queue size (#{queue_worker_configuration['queue_size'].to_i})") if queue_worker_configuration['queue_size'].to_i < 1 raise ArgumentError.new("Invalid number of back off attempts (#{queue_worker_configuration['back_off_attempts'].to_i})") if queue_worker_configuration['back_off_attempts'].to_i < 1 end def exponential_back_off(start_at_last_attempt: false) attempt = 1 if start_at_last_attempt attempt = @maximum_back_off_attempts sleep_unless_stopping(calculate_back_off_delay(@maximum_back_off_attempts)) end begin yield rescue StandardError => exception # Any exception derived from StandardError is assumed to be a failure and # attempted again until it completes without an exception or an exception # not derived from StandardError @latest_failed_audit_error_message = "#{exception.class}: #{exception.message}" @latest_failed_audit_timestamp = Time.now.utc.iso8601(3) @failed_audit_attempts += 1 if ((attempt <= @maximum_back_off_attempts) and (not @stopping)) then sleep_unless_stopping(calculate_back_off_delay(attempt)) attempt = attempt + 1 retry else raise end end end def calculate_back_off_delay(attempt) @initial_back_off_in_seconds * (@back_off_multiplier ** (attempt-1)) end def sleep_unless_stopping(desired_delay) start_time = Time.now until (@stopping or ((Time.now - start_time) >= desired_delay)) do sleep(0.1) end end def sleep_while_still_running(desired_delay) start_time = Time.now until ((false == @running) or ((Time.now - start_time) >= desired_delay)) do sleep(0.1) end end def initialize_metrics @failed_audit_attempts = 0 @latest_failed_audit_timestamp = 0 @successful_audits = 0 @latest_successful_audit_timestamp = 0 @dequeued_audits = 0 @latest_successful_audit_timespan = 0 @latest_failed_audit_error_message = "None" end end end