lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.6.3 vs lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.7.0

- old
+ new

@@ -12,37 +12,48 @@ attr_reader :counters, :consumer_groups, :errors, :times, :pauses, :jobs # Current schema version # This can be used in the future for detecting incompatible changes and writing # migrations - SCHEMA_VERSION = '1.2.0' + SCHEMA_VERSION = '1.2.3' # 60 seconds window for time tracked window-based metrics TIMES_TTL = 60 # Times ttl in ms TIMES_TTL_MS = TIMES_TTL * 1_000 - private_constant :TIMES_TTL, :TIMES_TTL_MS, :SCHEMA_VERSION + # Counters that count events occurrences during the given window + COUNTERS_BASE = { + # Number of processed batches + batches: 0, + # Number of processed messages + messages: 0, + # Number of errors that occurred + errors: 0, + # Number of retries that occurred + retries: 0, + # Number of messages considered dead + dead: 0 + }.freeze + private_constant :TIMES_TTL, :TIMES_TTL_MS, :COUNTERS_BASE + def initialize super - @counters = { - batches: 0, - messages: 0, - errors: 0, - retries: 0, - dead: 0 - } + @counters = COUNTERS_BASE.dup @times = TtlHash.new(TIMES_TTL_MS) @consumer_groups = {} @errors = [] @started_at = float_now @pauses = Set.new @jobs = {} @shell = MemoizedShell.new + @memory_total_usage = 0 + @memory_usage = 0 + @cpu_usage = [-1, -1, -1] end # We cannot report and track the same time, that is why we use mutex here. To make sure # that samples aggregations and counting does not interact with reporter flushing. def track @@ -61,16 +72,17 @@ process: { started_at: @started_at, name: process_name, status: ::Karafka::App.config.internal.status.to_s, listeners: listeners, - concurrency: concurrency, - memory_usage: memory_usage, - memory_total_usage: memory_total_usage, + workers: workers, + memory_usage: @memory_usage, + memory_total_usage: @memory_total_usage, memory_size: memory_size, - cpu_count: cpu_count, - cpu_usage: cpu_usage, + cpus: cpus, + threads: threads, + cpu_usage: @cpu_usage, tags: Karafka::Process.tags }, versions: { ruby: ruby_version, @@ -99,10 +111,21 @@ @counters.each { |k, _| @counters[k] = 0 } @errors.clear end + # @note This should run before any mutex, so other threads can continue as those + # operations may invoke shell commands + def sample + memory_threads_ps + + @memory_usage = memory_usage + @memory_total_usage = memory_total_usage + @cpu_usage = cpu_usage + @threads = threads + end + private # @return [Numeric] % utilization of all the threads. 100% means all the threads are # utilized all the time within the given time window. 0% means, nothing is happening # most if not all the time. @@ -113,11 +136,11 @@ timefactor = float_now - @started_at timefactor = timefactor > TIMES_TTL ? TIMES_TTL : timefactor # We divide by 1_000 to convert from milliseconds # We multiply by 100 to have it in % scale - times[:total].sum / 1_000 / concurrency / timefactor * 100 + times[:total].sum / 1_000 / workers / timefactor * 100 end # @return [Integer] number of listeners def listeners # This can be zero before the server starts @@ -150,44 +173,40 @@ end # @return [Hash] job queue statistics def jobs_queue_statistics # We return empty stats in case jobs queue is not yet initialized + # busy - represents number of jobs that are being executed currently + # enqueued - represents number of jobs that are enqueued to be processed Karafka::Server.jobs_queue&.statistics || { busy: 0, enqueued: 0 } end # Total memory used in the OS def memory_total_usage - case RUBY_PLATFORM - when /darwin|bsd|linux/ - @shell - .call('ps -A -o rss=') - .split("\n") - .inject { |a, e| a.to_i + e.strip.to_i } - else - 0 - end + return 0 unless @memory_threads_ps + + @memory_threads_ps.map(&:first).sum end # @return [Integer] total amount of memory def memory_size @memory_size ||= case RUBY_PLATFORM when /linux/ @shell - .call('grep MemTotal /proc/meminfo') - .match(/(\d+)/) - .to_s - .to_i + .call('grep MemTotal /proc/meminfo') + .match(/(\d+)/) + .to_s + .to_i when /darwin|bsd/ @shell - .call('sysctl -a') - .split("\n") - .find { |line| line.start_with?('hw.memsize:') } - .to_s - .split(' ') - .last - .to_i + .call('sysctl -a') + .split("\n") + .find { |line| line.start_with?('hw.memsize:') } + .to_s + .split(' ') + .last + .to_i else 0 end end @@ -204,17 +223,39 @@ else [-1, -1, -1] end end + # @return [Integer] number of process threads. + # @note This returns total number of threads from the OS perspective including native + # extensions threads, etc. + def threads + return 0 unless @memory_threads_ps + + @memory_threads_ps.find { |row| row.last == ::Process.pid }[1] + end + # @return [Integer] CPU count - def cpu_count - @cpu_count ||= Etc.nprocessors + def cpus + @cpus ||= Etc.nprocessors end # @return [Integer] number of threads that process work - def concurrency - @concurrency ||= Karafka::App.config.concurrency + def workers + @workers ||= Karafka::App.config.concurrency + end + + # Loads our ps results into memory so we can extract from them whatever we need + def memory_threads_ps + @memory_threads_ps = case RUBY_PLATFORM + when /darwin|bsd|linux/ + @shell + .call('ps -A -o rss=,thcount,pid') + .split("\n") + .map { |row| row.strip.split(' ').map(&:to_i) } + else + @memory_threads_ps = false + end end end end end end