Sha256: eba17a362fa082a59804fd9cb469e2f0aa501322d5b728d410eb4a0b98213a8b
Contents?: true
Size: 1.76 KB
Versions: 24
Compression:
Stored size: 1.76 KB
Contents
# frozen_string_literal: true module Karafka # Namespace for all elements related to requests routing module Routing # Karafka framework Router for routing incoming messages to proper consumers # @note Since Kafka does not provide namespaces or modules for topics, they all have "flat" # structure so all the routes are being stored in a single level array module Router # Finds first reference of a given topic based on provided lookup attribute # @param lookup [Hash<Symbol, String>] hash with attribute - value key pairs # @return [Karafka::Routing::Topic, nil] proper route details or nil if not found def find_by(lookup) App.consumer_groups.each do |consumer_group| consumer_group.topics.each do |topic| return topic if lookup.all? do |attribute, value| topic.public_send(attribute) == value end end end nil end # Finds the topic by name (in any consumer group) and if not present, will built a new # representation of the topic with the defaults and default deserializer. # # This is used in places where we may operate on topics that are not part of the routing # but we want to do something on them (display data, iterate over, etc) # @param name [String] name of the topic we are looking for # @return [Karafka::Routing::Topic] # # @note Please note, that in case of a new topic, it will have a newly built consumer group # as well, that is not part of the routing. def find_or_initialize_by_name(name) find_by(name: name) || Topic.new(name, ConsumerGroup.new(name)) end module_function :find_by module_function :find_or_initialize_by_name end end end
Version data entries
24 entries across 24 versions & 1 rubygems
Version | Path |
---|---|
karafka-2.2.3 | lib/karafka/routing/router.rb |
karafka-2.2.2 | lib/karafka/routing/router.rb |
karafka-2.2.1 | lib/karafka/routing/router.rb |
karafka-2.2.0 | lib/karafka/routing/router.rb |