lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.6.3 vs lib/karafka/web/tracking/consumers/reporter.rb in karafka-web-0.7.0
- old
+ new
@@ -30,10 +30,18 @@
# Dispatches the current state from sampler to appropriate topics
#
# @param forced [Boolean] should we report bypassing the time frequency or should we
# report only in case we would not send the report for long enough time.
def report(forced: false)
+ # Do not even mutex if not needed
+ return unless report?(forced)
+
+ # We run this sampling before the mutex so sampling does not stop things in case
+ # other threads would need this mutex. This can take up to 25ms and we do not want to
+ # block during this time
+ sampler.sample
+
MUTEX.synchronize do
# Start background thread only when needed
# This prevents us from starting it too early or for non-consumer processes where
# Karafka is being included
async_call unless @running
@@ -50,28 +58,28 @@
# Report consumers statuses
messages = [
{
topic: ::Karafka::Web.config.topics.consumers.reports,
- payload: report.to_json,
+ payload: Zlib::Deflate.deflate(report.to_json),
key: process_name,
- partition: 0
+ partition: 0,
+ headers: { 'zlib' => 'true' }
}
]
# Report errors that occurred (if any)
messages += sampler.errors.map do |error|
@error_contract.validate!(error)
{
topic: Karafka::Web.config.topics.errors,
- payload: error.to_json,
+ payload: Zlib::Deflate.deflate(error.to_json),
# Always dispatch errors from the same process to the same partition
- key: process_name
+ key: process_name,
+ headers: { 'zlib' => 'true' }
}
end
-
- return if messages.empty?
produce(messages)
# Clear the sampler so it tracks new state changes without previous once impacting
# the data