# typed: ignore # Copyright (c) 2015 Sqreen. All Rights Reserved. # Please refer to our terms for more information: https://www.sqreen.com/terms.html require 'sqreen/kit/loggable' module Sqreen module Kit module Signals class BatchCollector include Loggable EXIT_SENTINEL = Object.new.freeze DEFAULT_MAX_DELAY_S = 45 DEFAULT_FLUSH_SIZE = 30 DEFAULT_MAX_BATCH_SIZE = 100 attr_reader :auth_sig_client, :flush_size, :max_delay_s, :max_batch_size, :queue # @param auth_sig_client [AuthSignalsClient] def initialize(auth_sig_client, opts = {}) @auth_sig_client = auth_sig_client @flush_size = opts[:flush_size] || DEFAULT_FLUSH_SIZE @max_batch_size = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE @max_delay_s = opts[:max_delay_s] || DEFAULT_MAX_DELAY_S @queue = QueueWithTimeout.new @thread = nil if max_batch_size < flush_size # rubocop:disable Style/GuardClause raise ArgumentError, 'max batch size < flush size' end end def <<(signal_or_trace) @queue << signal_or_trace end def start @processing_loop = ProcessingLoop.new(self) @thread = Thread.new do @processing_loop.run end end def running? return false if thread.nil? @thread.alive? end def close return if @thread.nil? @queue << EXIT_SENTINEL @thread.join end class ProcessingLoop include Loggable # @param [BatchCollector] collector def initialize(collector) @collector = collector @next_batch = [] @deadline = nil end def queue @collector.queue end def max_batch_size @collector.max_batch_size end def flush_size @collector.flush_size end def run while run_loop_once; end logger.info 'Collector thread exiting' end private def run_loop_once el = queue.pop(@deadline) if el.nil? # deadline passed submit elsif el.equal?(EXIT_SENTINEL) return false else # a signal or a trace if @next_batch.empty? # first object, set a deadline @deadline = Time.now.to_f + @collector.max_delay_s end @next_batch << el # drain the queue completely until @next_batch.size >= max_batch_size || (el = queue.pop_nb).nil? if el.equal?(EXIT_SENTINEL) queue << EXIT_SENTINEL # push it back break end @next_batch << el end submit if @next_batch.size >= flush_size end true end def submit logger.debug { "Batch submit. Batch size: #{@next_batch.size}" } @deadline = nil return if @next_batch.empty? cur_batch = @next_batch @next_batch = [] @collector.auth_sig_client.report_batch(cur_batch) end end # Adapted from https://spin.atomicobject.com/2014/07/07/ruby-queue-pop-timeout/ class QueueWithTimeout include Loggable MAX_QUEUE_SIZE = 1000 def initialize @mutex = Mutex.new @queue = [] @received = ConditionVariable.new end def <<(x) @mutex.synchronize do if @queue.size >= MAX_QUEUE_SIZE # processing loop is prob spending too much time on http requests logger.warn "Queue is full! Dropping #{x}" next end @queue << x @received.signal end end # non-blocking pop def pop_nb @mutex.synchronize do return nil if @queue.empty? @queue.shift end end # @param deadline [Float] def pop(deadline = nil) @mutex.synchronize do if deadline.nil? # wait indefinitely until there is an element in the queue @received.wait(@mutex) while @queue.empty? elsif @queue.empty? # wait for element or timeout while @queue.empty? && (remaining_time = deadline - Time.now.to_f) > 0 @received.wait(@mutex, remaining_time) end end return nil if @queue.empty? @queue.shift end end end # end QueueWithTimeout end end end end