lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.7 vs lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.8
- old
+ new
@@ -3,14 +3,11 @@
module Karafka
module Web
module Tracking
module Consumers
# Reports the collected data about the process and sends it, so we can use it in the UI
- class Reporter
- include ::Karafka::Core::Helpers::Time
- include ::Karafka::Helpers::Async
-
+ class Reporter < Tracking::Reporter
# Minimum number of messages to produce to produce them in sync mode
# This acts as a small back-off not to overload the system in case we would have
# extremely big number of errors happening
PRODUCE_SYNC_THRESHOLD = 25
@@ -19,16 +16,35 @@
# This mutex is shared between tracker and samplers so there is no case where metrics
# would be collected same time tracker reports
MUTEX = Mutex.new
def initialize
+ super
# Move back so first report is dispatched fast to indicate, that the process is alive
@tracked_at = monotonic_now - 10_000
@report_contract = Consumers::Contracts::Report.new
@error_contract = Tracking::Contracts::Error.new
end
+ # We never report in initializing phase because things are not yet fully configured
+ # We never report in the initialized because server is not yet ready until Karafka is
+ # fully running and some of the things like listeners are not yet available
+ #
+ # This method will also be `false` in case we are not running in `karafka server` or
+ # in embedding, because in those cases Karafka does not go beyond the `initialized` phase
+ #
+ # @return [Boolean] are we able to report consumer state
+ def active?
+ # If we do not have a producer that we could use to report or it was closed, we cannot
+ # and should not report
+ return false unless super
+ return false if ::Karafka::App.initializing?
+ return false if ::Karafka::App.initialized?
+
+ true
+ end
+
# Dispatches the current state from sampler to appropriate topics
#
# @param forced [Boolean] should we report bypassing the time frequency or should we
# report only in case we would not send the report for long enough time.
def report(forced: false)
@@ -39,15 +55,10 @@
# other threads would need this mutex. This can take up to 25ms and we do not want to
# block during this time
sampler.sample
MUTEX.synchronize do
- # Start background thread only when needed
- # This prevents us from starting it too early or for non-consumer processes where
- # Karafka is being included
- async_call unless @running
-
return unless report?(forced)
@tracked_at = monotonic_now
report = sampler.to_report
@@ -95,34 +106,14 @@
report(forced: true)
end
private
- # Reports the process state once in a while
- def call
- @running = true
-
- # We won't track more often anyhow but want to try frequently not to miss a window
- # We need to convert the sleep interval into seconds for sleep
- sleep_time = ::Karafka::Web.config.tracking.interval.to_f / 1_000 / 10
-
- loop do
- report
-
- sleep(sleep_time)
- end
- end
-
# @param forced [Boolean] is this report forced. Forced means that as long as we can
# flush we will flush
# @return [Boolean] Should we report or is it not yet time to do so
def report?(forced)
- # We never report in initializing phase because things are not yet fully configured
- return false if ::Karafka::App.initializing?
- # We never report in the initialized because server is not yet ready until Karafka is
- # fully running and some of the things like listeners are not yet available
- return false if ::Karafka::App.initialized?
-
+ return false unless active?
return true if forced
(monotonic_now - @tracked_at) >= ::Karafka::Web.config.tracking.interval
end