Sha256: 7f14457484bef5375313b8a83b3b3f9e75bdd74070bcd746cc4a8d7efb24f50b

Contents?: true

Size: 1.85 KB

Versions: 8

Compression:

Stored size: 1.85 KB

Contents

# frozen_string_literal: true

module Karafka
  module TimeTrackers
    # Tracker used to keep time reference when we last time dispatched any job related to a given
    # topic partition.
    #
    # We can use it to know when last time a job was scheduled
    #
    # @note We do not track revocation as on revocation we clear given topic partition reference
    #   not to have a potential memory leak
    #
    # @note We do not track shutdown jobs as shutdown is finishing the process, so no time
    #   sensitive operations remain that would use this
    #
    # @note We consider partition as active if we scheduled any job related to it within the tick
    #   interval. This has nothing to do whether a partition is assigned.
    class PartitionUsage < Base
      # Creates new partition usage time tracker
      def initialize
        super

        @last_usage = Hash.new do |topics_hash, topic_name|
          topics_hash[topic_name] = Hash.new do |partitions_hash, partition_id|
            partitions_hash[partition_id] = 0
          end
        end
      end

      # @param topic [String]
      # @param partition [Integer]
      # @param interval [Integer] minimum interval
      # @return [Boolean] was this topic partition active
      def active?(topic, partition, interval)
        monotonic_now - @last_usage[topic][partition] < interval
      end

      # Marks usage of given partition
      #
      # @param topic [String]
      # @param partition [Integer]
      def track(topic, partition)
        @last_usage[topic][partition] = monotonic_now
      end

      # Clears references about given partition. Useful on revocation so we do not store old
      # unassigned partitions data
      #
      # @param topic [String]
      # @param partition [Integer]
      def revoke(topic, partition)
        @last_usage[topic].delete(partition)
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
karafka-2.3.4 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.3 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.2 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.1 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.0 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.0.rc1 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.0.alpha2 lib/karafka/time_trackers/partition_usage.rb
karafka-2.3.0.alpha1 lib/karafka/time_trackers/partition_usage.rb