Sha256: ecb8b42c01c97d16fe2b7b23b47ba10bf45d4b665ffd8da044657ac9a0a5707c

Contents?: true

Size: 1.83 KB

Versions: 27

Compression:

Stored size: 1.83 KB

Contents

# frozen_string_literal: true

module Karafka
  module Persistence
    # Local cache for routing topics
    # We use it in order not to build string instances and remap incoming topic upon each
    # message / message batches received
    class Topics
      # Thread.current scope under which we store topics data
      PERSISTENCE_SCOPE = :topics

      private_constant :PERSISTENCE_SCOPE

      class << self
        # @return [Concurrent::Hash] hash with all the topics from given groups
        def current
          Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key|
            hash[key] = Concurrent::Hash.new
          end
        end

        # @param group_id [String] group id for which we fetch a topic representation
        # @param raw_topic_name [String] raw topic name (before remapping) for which we fetch a
        #   topic representation
        # @return [Karafka::Routing::Topics] remapped topic representation that can be used further
        #   on when working with given parameters
        def fetch(group_id, raw_topic_name)
          current[group_id][raw_topic_name] ||= begin
            # We map from incoming topic name, as it might be namespaced, etc.
            # @see topic_mapper internal docs
            mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
            Routing::Router.find("#{group_id}_#{mapped_topic_name}")
          end
        end

        # Clears the whole topics cache for all the threads
        # This is used for in-development code reloading as we need to get rid of all the
        # preloaded and cached instances of objects to make it work
        def clear
          Thread
            .list
            .select { |thread| thread[PERSISTENCE_SCOPE] }
            .each { |thread| thread[PERSISTENCE_SCOPE].clear }
        end
      end
    end
  end
end

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/persistence/topics.rb
karafka-1.4.14 lib/karafka/persistence/topics.rb
karafka-1.4.13 lib/karafka/persistence/topics.rb
karafka-1.4.12 lib/karafka/persistence/topics.rb
karafka-1.4.11 lib/karafka/persistence/topics.rb
karafka-1.4.10 lib/karafka/persistence/topics.rb
karafka-1.4.9 lib/karafka/persistence/topics.rb
karafka-1.4.8 lib/karafka/persistence/topics.rb
karafka-1.4.7 lib/karafka/persistence/topics.rb
karafka-1.4.6 lib/karafka/persistence/topics.rb
karafka-1.4.5 lib/karafka/persistence/topics.rb
karafka-1.4.4 lib/karafka/persistence/topics.rb
karafka-1.4.3 lib/karafka/persistence/topics.rb
karafka-1.4.2 lib/karafka/persistence/topics.rb
karafka-1.4.1 lib/karafka/persistence/topics.rb
karafka-1.4.0 lib/karafka/persistence/topics.rb
karafka-1.4.0.rc2 lib/karafka/persistence/topics.rb
karafka-1.4.0.rc1 lib/karafka/persistence/topics.rb
karafka-1.3.7 lib/karafka/persistence/topics.rb
karafka-1.3.6 lib/karafka/persistence/topics.rb