# frozen_string_literal: true

module Karafka
  module Processing
    # Buffer for executors of a given subscription group. It wraps around the concept of building
    # and caching them, so we can re-use them instead of creating new each time.
    class ExecutorsBuffer
      # @param client [Connection::Client]
      # @param subscription_group [Routing::SubscriptionGroup]
      # @return [ExecutorsBuffer]
      def initialize(client, subscription_group)
        @subscription_group = subscription_group
        @client = client
        # We need two layers here to keep track of topics, partitions and processing groups
        @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
      end

      # Finds or creates an executor based on the provided details
      #
      # @param topic [String] topic name
      # @param partition [Integer] partition number
      # @param parallel_key [String] parallel group key
      # @return [Executor] consumer executor
      def find_or_create(topic, partition, parallel_key)
        ktopic = find_topic(topic)

        @buffer[ktopic][partition][parallel_key] ||= Executor.new(
          @subscription_group.id,
          @client,
          ktopic
        )
      end

      # Revokes executors of a given topic partition, so they won't be used anymore for incoming
      # messages
      #
      # @param topic [String] topic name
      # @param partition [Integer] partition number
      def revoke(topic, partition)
        ktopic = find_topic(topic)

        @buffer[ktopic][partition].clear
      end

      # Finds all the executors available for a given topic partition
      #
      # @param topic [String] topic name
      # @param partition [Integer] partition number
      # @return [Array<Executor>] executors in use for this topic + partition
      def find_all(topic, partition)
        ktopic = find_topic(topic)

        @buffer[ktopic][partition].values
      end

      # Iterates over all available executors and yields them together with topic and partition
      # info
      # @yieldparam [Routing::Topic] karafka routing topic object
      # @yieldparam [Integer] partition number
      # @yieldparam [Executor] given executor
      def each
        @buffer.each do |ktopic, partitions|
          partitions.each do |partition, executors|
            executors.each do |_parallel_key, executor|
              # We skip the parallel key here as it does not serve any value when iterating
              yield(ktopic, partition, executor)
            end
          end
        end
      end

      # Clears the executors buffer. Useful for critical errors recovery.
      def clear
        @buffer.clear
      end

      private

      # Finds topic based on its name
      #
      # @param topic [String] topic we're looking for
      # @return [Karafka::Routing::Topic] topic we're interested in
      def find_topic(topic)
        @subscription_group.topics.find(topic) || raise(Errors::TopicNotFoundError, topic)
      end
    end
  end
end