Sha256: f494b4be847c5b84f95d71cbceecb21de94c6dad888584c126673a234335053a

Contents?: true

Size: 1.78 KB

Versions: 26

Compression:

Stored size: 1.78 KB

Contents

# frozen_string_literal: true

module Karafka
  # Module used to provide a persistent cache layer for Karafka components that need to be
  # shared inside of a same thread
  module Persistence
    # Module used to provide a persistent cache across batch requests for a given
    # topic and partition to store some additional details when the persistent mode
    # for a given topic is turned on
    class Consumers
      # Thread.current scope under which we store consumers data
      PERSISTENCE_SCOPE = :consumers

      private_constant :PERSISTENCE_SCOPE

      class << self
        # @return [Hash] current thread's persistence scope hash with all the consumers
        def current
          Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key|
            hash[key] = Concurrent::Hash.new
          end
        end

        # Used to build (if block given) and/or fetch a current consumer instance that will be
        #   used to process messages from a given topic and partition
        # @param topic [Karafka::Routing::Topic] topic instance for which we might cache
        # @param partition [Integer] number of partition for which we want to cache
        # @return [Karafka::BaseConsumer] base consumer descendant
        def fetch(topic, partition)
          current[topic][partition] ||= topic.consumer.new(topic)
        end

        # Removes all persisted instances of consumers from the consumer cache
        # @note This is used to reload consumers instances when code reloading in development mode
        #   is present. This should not be used in production.
        def clear
          Thread
            .list
            .select { |thread| thread[PERSISTENCE_SCOPE] }
            .each { |thread| thread[PERSISTENCE_SCOPE].clear }
        end
      end
    end
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/persistence/consumers.rb
karafka-1.4.14 lib/karafka/persistence/consumers.rb
karafka-1.4.13 lib/karafka/persistence/consumers.rb
karafka-1.4.12 lib/karafka/persistence/consumers.rb
karafka-1.4.11 lib/karafka/persistence/consumers.rb
karafka-1.4.10 lib/karafka/persistence/consumers.rb
karafka-1.4.9 lib/karafka/persistence/consumers.rb
karafka-1.4.8 lib/karafka/persistence/consumers.rb
karafka-1.4.7 lib/karafka/persistence/consumers.rb
karafka-1.4.6 lib/karafka/persistence/consumers.rb
karafka-1.4.5 lib/karafka/persistence/consumers.rb
karafka-1.4.4 lib/karafka/persistence/consumers.rb
karafka-1.4.3 lib/karafka/persistence/consumers.rb
karafka-1.4.2 lib/karafka/persistence/consumers.rb
karafka-1.4.1 lib/karafka/persistence/consumers.rb
karafka-1.4.0 lib/karafka/persistence/consumers.rb
karafka-1.4.0.rc2 lib/karafka/persistence/consumers.rb
karafka-1.4.0.rc1 lib/karafka/persistence/consumers.rb
karafka-1.3.7 lib/karafka/persistence/consumers.rb
karafka-1.3.6 lib/karafka/persistence/consumers.rb