lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.6.3 vs lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.0

- old
+ new

@@ -30,10 +30,18 @@ # Dispatches the current state from sampler to appropriate topics # # @param forced [Boolean] should we report bypassing the time frequency or should we # report only in case we would not send the report for long enough time. def report(forced: false) + # Do not even mutex if not needed + return unless report?(forced) + + # We run this sampling before the mutex so sampling does not stop things in case + # other threads would need this mutex. This can take up to 25ms and we do not want to + # block during this time + sampler.sample + MUTEX.synchronize do # Start background thread only when needed # This prevents us from starting it too early or for non-consumer processes where # Karafka is being included async_call unless @running @@ -50,28 +58,28 @@ # Report consumers statuses messages = [ { topic: ::Karafka::Web.config.topics.consumers.reports, - payload: report.to_json, + payload: Zlib::Deflate.deflate(report.to_json), key: process_name, - partition: 0 + partition: 0, + headers: { 'zlib' => 'true' } } ] # Report errors that occurred (if any) messages += sampler.errors.map do |error| @error_contract.validate!(error) { topic: Karafka::Web.config.topics.errors, - payload: error.to_json, + payload: Zlib::Deflate.deflate(error.to_json), # Always dispatch errors from the same process to the same partition - key: process_name + key: process_name, + headers: { 'zlib' => 'true' } } end - - return if messages.empty? produce(messages) # Clear the sampler so it tracks new state changes without previous once impacting # the data