lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.2.2 vs lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.2.3

- old
+ new

@@ -58,11 +58,11 @@ process: { started_at: @started_at, name: process_name, status: ::Karafka::App.config.internal.status.to_s, - listeners: Karafka::Server.listeners.count, + listeners: listeners, concurrency: concurrency, memory_usage: memory_usage, memory_total_usage: memory_total_usage, memory_size: memory_size, cpu_count: cpu_count, @@ -74,11 +74,11 @@ ruby: ruby_version, karafka: ::Karafka::VERSION, waterdrop: ::WaterDrop::VERSION }, - stats: Karafka::Server.jobs_queue.statistics.merge( + stats: jobs_queue_statistics.merge( utilization: utilization ).merge(total: @counters), consumer_groups: @consumer_groups, jobs: jobs.values @@ -110,10 +110,16 @@ # We divide by 1_000 to convert from milliseconds # We multiply by 100 to have it in % scale times[:total].sum / 1_000 / concurrency / timefactor * 100 end + # @return [Integer] number of listeners + def listeners + # This can be zero before the server starts + Karafka::Server.listeners&.count.to_i + end + # @return [String] Unique process name def process_name @process_name ||= "#{Socket.gethostname}:#{::Process.pid}:#{SecureRandom.hex(6)}" end @@ -138,9 +144,15 @@ .last .to_i else 0 end + end + + # @return [Hash] job queue statistics + def jobs_queue_statistics + # We return empty stats in case jobs queue is not yet initialized + Karafka::Server.jobs_queue&.statistics || { busy: 0, enqueued: 0 } end # Total memory used in the OS def memory_total_usage case RUBY_PLATFORM