Sha256: 463c3ef707ad64cf3854dcc4be8b7ab4deb8e52dd8adeafadddbcd5c853c4710

Contents?: true

Size: 1.45 KB

Versions: 7

Compression:

Stored size: 1.45 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    # Buffer for executors of a given subscription group. It wraps around the concept of building
    # and caching them, so we can re-use them instead of creating new each time.
    class ExecutorsBuffer
      # @param client [Connection::Client]
      # @param subscription_group [Routing::SubscriptionGroup]
      # @return [ExecutorsBuffer]
      def initialize(client, subscription_group)
        @subscription_group = subscription_group
        @client = client
        @buffer = Hash.new { |h, k| h[k] = {} }
      end

      # @param topic [String] topic name
      # @param partition [Integer] partition number
      # @param pause [TimeTrackers::Pause] pause corresponding with provided topic and partition
      # @return [Executor] consumer executor
      def fetch(
        topic,
        partition,
        pause
      )
        topic = @subscription_group.topics.find { |ktopic| ktopic.name == topic }

        topic || raise(Errors::TopicNotFoundError, topic)

        @buffer[topic][partition] ||= Executor.new(
          @subscription_group.id,
          @client,
          topic,
          pause
        )
      end

      # Runs the shutdown on all active executors.
      def shutdown
        @buffer.values.map(&:values).flatten.each(&:shutdown)
      end

      # Clears the executors buffer. Useful for critical errors recovery.
      def clear
        @buffer.clear
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
karafka-2.0.0.beta1 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha6 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha5 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha4 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha3 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha2 lib/karafka/processing/executors_buffer.rb
karafka-2.0.0.alpha1 lib/karafka/processing/executors_buffer.rb