Sha256: e926e18f64f1da5a30de5bc0761ffd1e9db1efb6e88879202b7110efbb38636f

Contents?: true

Size: 1.22 KB

Versions: 20

Compression:

Stored size: 1.22 KB

Contents

# frozen_string_literal: true

module Karafka
  module Web
    module Processing
      module Consumers
        # Fetches the current consumer processes aggregated state
        class State
          class << self
            # Fetch the current consumers state that is expected to exist
            #
            # @return [Hash] last (current) aggregated processes state
            def current!
              state_message = ::Karafka::Admin.read_topic(
                Karafka::Web.config.topics.consumers.states,
                0,
                # We need to take more in case there would be transactions running.
                # In theory we could take two but this compensates for any involuntary
                # revocations and cases where two producers would write to the same state
                5
              ).last

              return state_message.payload if state_message

              raise(::Karafka::Web::Errors::Processing::MissingConsumersStateError)
            rescue Rdkafka::RdkafkaError => e
              raise(e) unless e.code == :unknown_partition

              raise(::Karafka::Web::Errors::Processing::MissingConsumersStatesTopicError)
            end
          end
        end
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
karafka-web-0.10.4 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.3 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.2 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.0 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.0.rc2 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.0.rc1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.10.0.beta1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.9.1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.9.0 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.9.0.rc3 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.9.0.rc2 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.9.0.rc1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.8.2 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.8.1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.8.0 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.8.0.rc1 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.7.10 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.7.9 lib/karafka/web/processing/consumers/state.rb
karafka-web-0.7.8 lib/karafka/web/processing/consumers/state.rb