lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.7 vs lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.8

- old
+ new

@@ -3,14 +3,11 @@ module Karafka module Web module Tracking module Consumers # Reports the collected data about the process and sends it, so we can use it in the UI - class Reporter - include ::Karafka::Core::Helpers::Time - include ::Karafka::Helpers::Async - + class Reporter < Tracking::Reporter # Minimum number of messages to produce to produce them in sync mode # This acts as a small back-off not to overload the system in case we would have # extremely big number of errors happening PRODUCE_SYNC_THRESHOLD = 25 @@ -19,16 +16,35 @@ # This mutex is shared between tracker and samplers so there is no case where metrics # would be collected same time tracker reports MUTEX = Mutex.new def initialize + super # Move back so first report is dispatched fast to indicate, that the process is alive @tracked_at = monotonic_now - 10_000 @report_contract = Consumers::Contracts::Report.new @error_contract = Tracking::Contracts::Error.new end + # We never report in initializing phase because things are not yet fully configured + # We never report in the initialized because server is not yet ready until Karafka is + # fully running and some of the things like listeners are not yet available + # + # This method will also be `false` in case we are not running in `karafka server` or + # in embedding, because in those cases Karafka does not go beyond the `initialized` phase + # + # @return [Boolean] are we able to report consumer state + def active? + # If we do not have a producer that we could use to report or it was closed, we cannot + # and should not report + return false unless super + return false if ::Karafka::App.initializing? + return false if ::Karafka::App.initialized? + + true + end + # Dispatches the current state from sampler to appropriate topics # # @param forced [Boolean] should we report bypassing the time frequency or should we # report only in case we would not send the report for long enough time. def report(forced: false) @@ -39,15 +55,10 @@ # other threads would need this mutex. This can take up to 25ms and we do not want to # block during this time sampler.sample MUTEX.synchronize do - # Start background thread only when needed - # This prevents us from starting it too early or for non-consumer processes where - # Karafka is being included - async_call unless @running - return unless report?(forced) @tracked_at = monotonic_now report = sampler.to_report @@ -95,34 +106,14 @@ report(forced: true) end private - # Reports the process state once in a while - def call - @running = true - - # We won't track more often anyhow but want to try frequently not to miss a window - # We need to convert the sleep interval into seconds for sleep - sleep_time = ::Karafka::Web.config.tracking.interval.to_f / 1_000 / 10 - - loop do - report - - sleep(sleep_time) - end - end - # @param forced [Boolean] is this report forced. Forced means that as long as we can # flush we will flush # @return [Boolean] Should we report or is it not yet time to do so def report?(forced) - # We never report in initializing phase because things are not yet fully configured - return false if ::Karafka::App.initializing? - # We never report in the initialized because server is not yet ready until Karafka is - # fully running and some of the things like listeners are not yet available - return false if ::Karafka::App.initialized? - + return false unless active? return true if forced (monotonic_now - @tracked_at) >= ::Karafka::Web.config.tracking.interval end