# frozen_string_literal: true module Karafka # Admin actions that we can perform via Karafka on our Kafka cluster # # @note It always initializes a new admin instance as we want to ensure it is always closed # Since admin actions are not performed that often, that should be ok. # # @note It always uses the primary defined cluster and does not support multi-cluster work. # Cluster on which operations are performed can be changed via `admin.kafka` config, however # there is no multi-cluster runtime support. module Admin # More or less number of seconds of 1 hundred years # Used for time referencing that does not have to be accurate but needs to be big HUNDRED_YEARS = 100 * 365.25 * 24 * 60 * 60 private_constant :HUNDRED_YEARS class << self # Allows us to read messages from the topic # # @param name [String, Symbol] topic name # @param partition [Integer] partition # @param count [Integer] how many messages we want to get at most # @param start_offset [Integer, Time] offset from which we should start. If -1 is provided # (default) we will start from the latest offset. If time is provided, the appropriate # offset will be resolved. If negative beyond -1 is provided, we move backwards more. # @param settings [Hash] kafka extra settings (optional) # # @return [Array] array with messages def read_topic(name, partition, count, start_offset = -1, settings = {}) messages = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less and move backwards with the negative # offset, allowing to start from N messages back from high-watermark start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative? start_offset = low_offset if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..start_offset + (count - 1)) # Establish theoretical available range. Note, that this does not handle cases related to # log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets that # are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.count tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if messages.size >= count message = consumer.poll(200) next unless message # If the message we've got is beyond the requested range, stop break unless possible_range.include?(message.offset) messages << message rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name) messages.map! do |message| Messages::Builders::Message.call( message, topic, Time.now ) end end # Creates Kafka topic with given settings # # @param name [String] topic name # @param partitions [Integer] number of partitions we expect # @param replication_factor [Integer] number of replicas # @param topic_config [Hash] topic config details as described here: # https://kafka.apache.org/documentation/#topicconfigs def create_topic(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topics_names.include?(name) } ) end end # Deleted a given topic # # @param name [String] topic name def delete_topic(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { !topics_names.include?(name) } ) end end # Creates more partitions for a given topic # # @param name [String] topic name # @param partitions [Integer] total number of partitions we expect to end up with def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topic_info(name).fetch(:partition_count) >= partitions } ) end end # Moves the offset on a given consumer group and provided topic to the requested location # # @param consumer_group_id [String] id of the consumer group for which we want to move the # existing offset # @param topics_with_partitions_and_offsets [Hash] Hash with list of topics and settings to # where to move given consumer. It allows us to move particular partitions or whole topics # if we want to reset all partitions to for example a point in time. # # @note This method should **not** be executed on a running consumer group as it creates a # "fake" consumer and uses it to move offsets. # # @example Move a single topic partition nr 1 offset to 100 # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 100 } }) # # @example Move offsets on all partitions of a topic to 100 # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 100 }) # # @example Move offset to 5 seconds ago on partition 2 # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 2 => 5.seconds.ago } }) # # @example Move to the earliest offset on all the partitions of a topic # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'earliest' }) # # @example Move to the latest (high-watermark) offset on all the partitions of a topic # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'latest' }) # # @example Move offset of a single partition to earliest # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'earliest' } }) # # @example Move offset of a single partition to latest # Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'latest' } }) def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) tpl_base = {} # Normalize the data so we always have all partitions and topics in the same format # That is in a format where we have topics and all partitions with their per partition # assigned offsets topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets| tpl_base[topic] = {} if partitions_with_offsets.is_a?(Hash) tpl_base[topic] = partitions_with_offsets else topic_info(topic)[:partition_count].times do |partition| tpl_base[topic][partition] = partitions_with_offsets end end end tpl_base.each_value do |partitions| partitions.transform_values! do |position| # Support both symbol and string based references casted_position = position.is_a?(Symbol) ? position.to_s : position # This remap allows us to transform some special cases in a reference that can be # understood by Kafka case casted_position # Earliest is not always 0. When compacting/deleting it can be much later, that's why # we fetch the oldest possible offset when 'earliest' Time.now - HUNDRED_YEARS # Latest will always be the high-watermark offset and we can get it just by getting # a future position when 'latest' Time.now + HUNDRED_YEARS # Same as `'latest'` when false Time.now - HUNDRED_YEARS # Regular offset case else position end end end tpl = Rdkafka::Consumer::TopicPartitionList.new # In case of time based location, we need to to a pre-resolution, that's why we keep it # separately time_tpl = Rdkafka::Consumer::TopicPartitionList.new # Distribute properly the offset type tpl_base.each do |topic, partitions_with_offsets| partitions_with_offsets.each do |partition, offset| target = offset.is_a?(Time) ? time_tpl : tpl # We reverse and uniq to make sure that potentially duplicated references are removed # in such a way that the newest stays target.to_h[topic] ||= [] target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset) target.to_h[topic].reverse! target.to_h[topic].uniq!(&:partition) target.to_h[topic].reverse! end end settings = { 'group.id': consumer_group_id } with_consumer(settings) do |consumer| # If we have any time based stuff to resolve, we need to do it prior to commits unless time_tpl.empty? real_offsets = consumer.offsets_for_times(time_tpl) real_offsets.to_h.each do |name, results| results.each do |result| raise(Errors::InvalidTimeBasedOffsetError) unless result partition = result.partition # Negative offset means we're beyond last message and we need to query for the # high watermark offset to get the most recent offset and move there if result.offset.negative? _, offset = consumer.query_watermark_offsets(name, result.partition) else # If we get an offset, it means there existed a message close to this time # location offset = result.offset end # Since now we have proper offsets, we can add this to the final tpl for commit tpl.to_h[name] ||= [] tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset) tpl.to_h[name].reverse! tpl.to_h[name].uniq!(&:partition) tpl.to_h[name].reverse! end end end consumer.commit_offsets(tpl, async: false) end end # Removes given consumer group (if exists) # # @param consumer_group_id [String] consumer group name # # @note This method should not be used on a running consumer group as it will not yield any # results. def delete_consumer_group(consumer_group_id) with_admin do |admin| handler = admin.delete_group(consumer_group_id) handler.wait(max_wait_timeout: app_config.admin.max_wait_time) end end # Fetches the watermark offsets for a given topic partition # # @param name [String, Symbol] topic name # @param partition [Integer] partition # @return [Array] low watermark offset and high watermark offset def read_watermark_offsets(name, partition) with_consumer do |consumer| consumer.query_watermark_offsets(name, partition) end end # Reads lags and offsets for given topics in the context of consumer groups defined in the # routing # @param consumer_groups_with_topics [Hash>] hash with consumer groups # names with array of topics to query per consumer group inside # @param active_topics_only [Boolean] if set to false, when we use routing topics, will # select also topics that are marked as inactive in routing # @return [Hash>>>] hash where the top level keys are # the consumer groups and values are hashes with topics and inside partitions with lags # and offsets # # @note For topics that do not exist, topic details will be set to an empty hash # # @note For topics that exist but were never consumed by a given CG we set `-1` as lag and # the offset on each of the partitions that were not consumed. # # @note This lag reporting is for committed lags and is "Kafka-centric", meaning that this # represents lags from Kafka perspective and not the consumer. They may differ. def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) # We first fetch all the topics with partitions count that exist in the cluster so we # do not query for topics that do not exist and so we can get partitions count for all # the topics we may need. The non-existent and not consumed will be filled at the end existing_topics = cluster_info.topics.map do |topic| [topic[:topic_name], topic[:partition_count]] end.to_h.freeze # If no expected CGs, we use all from routing that have active topics if consumer_groups_with_topics.empty? consumer_groups_with_topics = Karafka::App.routes.map do |cg| cg_topics = cg.topics.select do |cg_topic| active_topics_only ? cg_topic.active? : true end [cg.id, cg_topics.map(&:name)] end.to_h end # We make a copy because we will remove once with non-existing topics # We keep original requested consumer groups with topics for later backfilling cgs_with_topics = consumer_groups_with_topics.dup cgs_with_topics.transform_values!(&:dup) # We can query only topics that do exist, this is why we are cleaning those that do not # exist cgs_with_topics.each_value do |requested_topics| requested_topics.delete_if { |topic| !existing_topics.include?(topic) } end groups_lags = Hash.new { |h, k| h[k] = {} } groups_offs = Hash.new { |h, k| h[k] = {} } cgs_with_topics.each do |cg, topics| # Do not add to tpl topics that do not exist next if topics.empty? tpl = Rdkafka::Consumer::TopicPartitionList.new with_consumer('group.id': cg) do |consumer| topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) } commit_offsets = consumer.committed(tpl) commit_offsets.to_h.each do |topic, partitions| groups_offs[cg][topic] = {} partitions.each do |partition| # -1 when no offset is stored groups_offs[cg][topic][partition.partition] = partition.offset || -1 end end consumer.lag(commit_offsets).each do |topic, partitions_lags| groups_lags[cg][topic] = partitions_lags end end end consumer_groups_with_topics.each do |cg, topics| groups_lags[cg] topics.each do |topic| groups_lags[cg][topic] ||= {} next unless existing_topics.key?(topic) # We backfill because there is a case where our consumer group would consume for # example only one partition out of 20, rest needs to get -1 existing_topics[topic].times do |partition_id| groups_lags[cg][topic][partition_id] ||= -1 end end end merged = Hash.new { |h, k| h[k] = {} } groups_lags.each do |cg, topics| topics.each do |topic, partitions| merged[cg][topic] = {} partitions.each do |partition, lag| merged[cg][topic][partition] = { offset: groups_offs.fetch(cg).fetch(topic).fetch(partition), lag: lag } end end end merged end # @return [Rdkafka::Metadata] cluster metadata info def cluster_info with_admin(&:metadata) end # Returns basic topic metadata # # @param topic_name [String] name of the topic we're interested in # @return [Hash] topic metadata info hash # @raise [Rdkafka::RdkafkaError] `unknown_topic_or_part` if requested topic is not found # # @note This query is much more efficient than doing a full `#cluster_info` + topic lookup # because it does not have to query for all the topics data but just the topic we're # interested in def topic_info(topic_name) with_admin do |admin| admin .metadata(topic_name) .topics .find { |topic| topic[:topic_name] == topic_name } end end # Creates consumer instance and yields it. After usage it closes the consumer instance # This API can be used in other pieces of code and allows for low-level consumer usage # # @param settings [Hash] extra settings to customize consumer # # @note We always ship and yield a proxied consumer because admin API performance is not # that relevant. That is, there are no high frequency calls that would have to be delegated def with_consumer(settings = {}) bind_id = SecureRandom.uuid consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false) bind_oauth(bind_id, consumer) consumer.start proxy = ::Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close unbind_oauth(bind_id) end # Creates admin instance and yields it. After usage it closes the admin instance def with_admin bind_id = SecureRandom.uuid admin = config(:producer, {}).admin(native_kafka_auto_start: false) bind_oauth(bind_id, admin) admin.start proxy = ::Karafka::Connection::Proxy.new(admin) yield(proxy) ensure admin&.close unbind_oauth(bind_id) end private # Adds a new callback for given rdkafka instance for oauth token refresh (if needed) # # @param id [String, Symbol] unique (for the lifetime of instance) id that we use for # callback referencing # @param instance [Rdkafka::Consumer, Rdkafka::Admin] rdkafka instance to be used to set # appropriate oauth token when needed def bind_oauth(id, instance) ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add( id, Instrumentation::Callbacks::OauthbearerTokenRefresh.new( instance ) ) end # Removes the callback from no longer used instance # # @param id [String, Symbol] unique (for the lifetime of instance) id that we use for # callback referencing def unbind_oauth(id) ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(id) end # @return [Array] topics names def topics_names cluster_info.topics.map { |topic| topic.fetch(:topic_name) } end # There are some cases where rdkafka admin operations finish successfully but without the # callback being triggered to materialize the post-promise object. Until this is fixed we # can figure out, that operation we wanted to do finished successfully by checking that the # effect of the command (new topic, more partitions, etc) is handled. Exactly for that we # use the breaker. It we get a timeout, we can check that what we wanted to achieve has # happened via the breaker check, hence we do not need to wait any longer. # # @param handler [Proc] the wait handler operation # @param breaker [Proc] extra condition upon timeout that indicates things were finished ok def with_re_wait(handler, breaker) attempt ||= 0 attempt += 1 handler.call # If breaker does not operate, it means that the requested change was applied but is still # not visible and we need to wait raise(Errors::ResultNotVisibleError) unless breaker.call rescue Rdkafka::AbstractHandle::WaitTimeoutError, Errors::ResultNotVisibleError return if breaker.call retry if attempt <= app_config.admin.max_attempts raise end # @param type [Symbol] type of config we want # @param settings [Hash] extra settings for config (if needed) # @return [::Rdkafka::Config] rdkafka config def config(type, settings) app_config .kafka .then(&:dup) .merge(app_config.admin.kafka) .tap { |config| config[:'group.id'] = app_config.admin.group_id } # We merge after setting the group id so it can be altered if needed # In general in admin we only should alter it when we need to impersonate a given # consumer group or do something similar .merge!(settings) .then { |config| Karafka::Setup::AttributesMap.public_send(type, config) } .then { |config| ::Rdkafka::Config.new(config) } end # Resolves the offset if offset is in a time format. Otherwise returns the offset without # resolving. # @param consumer [::Rdkafka::Consumer] # @param name [String, Symbol] expected topic name # @param partition [Integer] # @param offset [Integer, Time] # @return [Integer] expected offset def resolve_offset(consumer, name, partition, offset) if offset.is_a?(Time) tpl = ::Rdkafka::Consumer::TopicPartitionList.new tpl.add_topic_and_partitions_with_offsets( name, partition => offset ) real_offsets = consumer.offsets_for_times(tpl) detected_offset = real_offsets .to_h .fetch(name) .find { |p_data| p_data.partition == partition } detected_offset&.offset || raise(Errors::InvalidTimeBasedOffsetError) else offset end end # @return [Karafka::Core::Configurable::Node] root node config def app_config ::Karafka::App.config end end end end