# frozen_string_literal: true # This Karafka component is a Pro component under a commercial license. # This Karafka component is NOT licensed under LGPL. # # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro module Routing module Features class Patterns < Base # Detects if a given topic matches any of the patterns and if so, injects it into the # given subscription group routing # # @note This is NOT thread-safe and should run in a thread-safe context that warranties # that there won't be any race conditions class Detector # Mutex for making sure that we do not modify same consumer group in runtime at the # same time from multiple subscription groups if they operate in a multiplexed mode MUTEX = Mutex.new private_constant :MUTEX # Checks if the provided topic matches any of the patterns and when detected, expands # the routing with it. # # @param sg_topics [Array<Karafka::Routing::Topic>] given subscription group routing # topics. # @param new_topic [String] new topic that we have detected def expand(sg_topics, new_topic) MUTEX.synchronize do sg_topics .map(&:patterns) .select(&:active?) .select(&:matcher?) .map(&:pattern) .then { |pts| pts.empty? ? return : pts } .then { |pts| Patterns.new(pts) } .find(new_topic) .then { |pattern| pattern || return } .then { |pattern| install(pattern, new_topic, sg_topics) } end end private # Adds the discovered topic into the routing # # @param pattern [Karafka::Pro::Routing::Features::Patterns::Pattern] matched pattern # @param discovered_topic [String] topic that we discovered that should be part of the # routing from now on. # @param sg_topics [Array<Karafka::Routing::Topic>] def install(pattern, discovered_topic, sg_topics) consumer_group = pattern.topic.consumer_group # Build new topic and register within the consumer group topic = consumer_group.public_send(:topic=, discovered_topic, &pattern.config) topic.patterns(active: true, type: :discovered) # Assign the appropriate subscription group to this topic topic.subscription_group = pattern.topic.subscription_group # Inject into subscription group topics array always, so everything is reflected # there but since it is not active, will not be picked sg_topics << topic end end end end end end end