Sha256: 788bf99dc2e5bab8e5ba28516ac2f15146e92df18a5f24af93df69c9b4c0a3a2

Contents?: true

Size: 1.79 KB

Versions: 26

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    # Namespace of the Inline Insights feature "non routing" related components
    #
    # @note We use both `#insights` because it is the feature name but also `#statistics` to make
    #   it consistent with the fact that we publish and operate on statistics. User can pick
    #   whichever name they prefer.
    module InlineInsights
      # Module that adds extra methods to the consumer that allow us to fetch the insights
      module Consumer
        # @return [Hash] empty hash or hash with given partition insights if already present
        # @note We cache insights on the consumer, as in some scenarios we may no longer have them
        #   inside the Tracker, for example under involuntary revocation, incoming statistics may
        #   no longer have lost partition insights. Since we want to be consistent during single
        #   batch operations, we want to ensure, that if we have insights they are available
        #   throughout the whole processing.
        def insights
          insights = Tracker.find(topic, partition)

          # If we no longer have new insights but we still have them locally, we can use them
          return @insights if @insights && insights.empty?
          # If insights are still the same, we can use them
          return @insights if @insights.equal?(insights)

          # If we've received new insights that are not empty, we can cache them
          @insights = insights
        end

        # @return [Boolean] true if there are insights to work with, otherwise false
        def insights?
          !insights.empty?
        end

        alias statistics insights
        alias statistics? insights?
        alias inline_insights insights
        alias inline_insights? insights?
      end
    end
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
karafka-2.4.13 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.12 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.11 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.10 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.9 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.8 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.7 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.6 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.5 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.4 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.3 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.0 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.0.rc1 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.3.4 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.0.beta2 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.4.0.beta1 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.3.3 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.3.2 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.3.1 lib/karafka/processing/inline_insights/consumer.rb
karafka-2.3.0 lib/karafka/processing/inline_insights/consumer.rb