lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.2.2 vs lib/karafka/web/tracking/consumers/sampler.rb in karafka-web-0.2.3
- old
+ new
@@ -58,11 +58,11 @@
process: {
started_at: @started_at,
name: process_name,
status: ::Karafka::App.config.internal.status.to_s,
- listeners: Karafka::Server.listeners.count,
+ listeners: listeners,
concurrency: concurrency,
memory_usage: memory_usage,
memory_total_usage: memory_total_usage,
memory_size: memory_size,
cpu_count: cpu_count,
@@ -74,11 +74,11 @@
ruby: ruby_version,
karafka: ::Karafka::VERSION,
waterdrop: ::WaterDrop::VERSION
},
- stats: Karafka::Server.jobs_queue.statistics.merge(
+ stats: jobs_queue_statistics.merge(
utilization: utilization
).merge(total: @counters),
consumer_groups: @consumer_groups,
jobs: jobs.values
@@ -110,10 +110,16 @@
# We divide by 1_000 to convert from milliseconds
# We multiply by 100 to have it in % scale
times[:total].sum / 1_000 / concurrency / timefactor * 100
end
+ # @return [Integer] number of listeners
+ def listeners
+ # This can be zero before the server starts
+ Karafka::Server.listeners&.count.to_i
+ end
+
# @return [String] Unique process name
def process_name
@process_name ||= "#{Socket.gethostname}:#{::Process.pid}:#{SecureRandom.hex(6)}"
end
@@ -138,9 +144,15 @@
.last
.to_i
else
0
end
+ end
+
+ # @return [Hash] job queue statistics
+ def jobs_queue_statistics
+ # We return empty stats in case jobs queue is not yet initialized
+ Karafka::Server.jobs_queue&.statistics || { busy: 0, enqueued: 0 }
end
# Total memory used in the OS
def memory_total_usage
case RUBY_PLATFORM