Sha256: fac0cf0398ee7ee0b40fd5a5075c608e5746e18b548753332d7bf81c40e61c28
Contents?: true
Size: 792 Bytes
Versions: 5
Compression:
Stored size: 792 Bytes
Contents
# frozen_string_literal: true module Karafka module Web module Processing module Consumers # Fetches the current consumer processes aggregated state class State class << self # Fetch the current consumers state that is expected to exist # # @return [Hash] last (current) aggregated processes state def current! state_message = ::Karafka::Admin.read_topic( Karafka::Web.config.topics.consumers.states, 0, 1 ).last return state_message.payload if state_message raise(::Karafka::Web::Errors::Processing::MissingConsumersStateError) end end end end end end end
Version data entries
5 entries across 5 versions & 1 rubygems