# frozen_string_literal: true # This Karafka component is a Pro component under a commercial license. # This Karafka component is NOT licensed under LGPL. # # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro module Processing # Manager that keeps track of our offsets with the virtualization layer that are local # to given partition assignment. It allows for easier offset management for virtual # virtual partition cases as it provides us ability to mark as consumed and move the # real offset behind as expected. # # @note We still use the regular coordinator "real" offset management as we want to have # them as separated as possible because the real seek offset management is also used for # pausing, filtering and others and should not be impacted by the virtual one # # @note This manager is **not** thread-safe by itself. It should operate from coordinator # locked locations. class VirtualOffsetManager attr_reader :groups # @param topic [String] # @param partition [Integer] # # @note We need topic and partition because we use a seek message (virtual) for real offset # management. We could keep real message reference but this can be memory consuming # and not worth it. def initialize(topic, partition) @topic = topic @partition = partition @groups = [] @marked = {} @real_offset = -1 end # Clears the manager for a next collective operation def clear @groups.clear @marked = {} @real_offset = -1 end # Registers an offset group coming from one virtual consumer. In order to move the real # underlying offset accordingly, we need to make sure to track the virtual consumers # offsets groups independently and only materialize the end result. # # @param offsets_group [Array] offsets from one virtual consumer def register(offsets_group) @groups << offsets_group offsets_group.each { |offset| @marked[offset] = false } end # Marks given message as marked (virtually consumed). # We mark given message offset and other earlier offsets from the same group as done # and we can refresh our real offset representation based on that as it might have changed # to a newer real offset. # @param message [Karafka::Messages::Message] message coming from VP we want to mark def mark(message) offset = message.offset group = @groups.find { |reg_group| reg_group.include?(offset) } # This case can happen when someone uses MoM and wants to mark message from a previous # batch as consumed. We can add it, since the real offset refresh will point to it unless group group = [offset] @groups << group end position = group.index(offset) # Mark all previous messages from the same group also as virtually consumed group[0..position].each do |markable_offset| @marked[markable_offset] = true end # Recompute the real offset representation materialize_real_offset end # Mark all from all groups including the `message`. # Useful when operating in a collapsed state for marking # @param message [Karafka::Messages::Message] def mark_until(message) mark(message) @groups.each do |group| group.each do |offset| next if offset > message.offset @marked[offset] = true end end materialize_real_offset end # @return [Array] Offsets of messages already marked as consumed virtually def marked @marked.select { |_, status| status }.map(&:first).sort end # Is there a real offset we can mark as consumed # @return [Boolean] def markable? !@real_offset.negative? end # @return [Messages::Seek] markable message for real offset marking def markable raise Errors::InvalidRealOffsetUsageError unless markable? Messages::Seek.new( @topic, @partition, @real_offset ) end private # Recomputes the biggest possible real offset we can have. # It picks the the biggest offset that has uninterrupted stream of virtually marked as # consumed because this will be the collective offset. def materialize_real_offset @marked.to_a.sort_by(&:first).each do |offset, marked| break unless marked @real_offset = offset end @real_offset = (@marked.keys.min - 1) if @real_offset.negative? end end end end end