Sha256: 4c18f2cb4eb42e019e7d61bb99bab512ea4cbe93ee9f95d815ddf9b558561bb5
Contents?: true
Size: 1013 Bytes
Versions: 21
Compression:
Stored size: 1013 Bytes
Contents
# frozen_string_literal: true module Karafka module Web module Processing module Consumers # Fetches the current consumer processes aggregated state class State extend ::Karafka::Core::Helpers::Time class << self # Try bootstrapping from the current state from Kafka if exists and if not, just use # a blank state. Blank state will not be flushed because materialization into Kafka # happens only after first report is received. # # @return [Hash, false] last (current) aggregated processes state or false if no # state is available def current state_message = ::Karafka::Admin.read_topic( Karafka::Web.config.topics.consumers.states, 0, 1 ).last state_message ? state_message.payload : { processes: {}, stats: {} } end end end end end end end
Version data entries
21 entries across 21 versions & 1 rubygems