Sha256: cb5fc6dd1c475f85ddf2f2bcd5dde66953fd0a78bc3b98fb0baf9f4b86938738

Contents?: true

Size: 1.96 KB

Versions: 3

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

# Karafka module namespace
module Karafka
  # Base controller from which all Karafka controllers should inherit
  class BaseController
    extend ActiveSupport::DescendantsTracker

    class << self
      attr_reader :topic

      # Assigns a topic to a controller and build up proper controller functionalities, so it can
      #   cooperate with the topic settings
      # @param topic [Karafka::Routing::Topic]
      # @return [Karafka::Routing::Topic] assigned topic
      def topic=(topic)
        @topic = topic
        Controllers::Includer.call(self)
      end
    end

    # @return [Karafka::Routing::Topic] topic to which a given controller is subscribed
    def topic
      self.class.topic
    end

    # Creates lazy loaded params batch object
    # @note Until first params usage, it won't parse data at all
    # @param messages [Array<Kafka::FetchedMessage>, Array<Hash>] messages with raw
    #   content (from Kafka) or messages inside a hash (from backend, etc)
    # @return [Karafka::Params::ParamsBatch] lazy loaded params batch
    def params_batch=(messages)
      @params_batch = Karafka::Params::ParamsBatch.new(messages, topic.parser)
    end

    # Executes the default controller flow.
    def call
      process
    end

    private

    # We make it private as it should be accesible only from the inside of a controller
    attr_reader :params_batch

    # @return [Karafka::Connection::Consumer] messages consumer that can be used to
    #    commit manually offset or pause / stop consumer based on the business logic
    def consumer
      Persistence::Consumer.read
    end

    # Method that will perform business logic and on data received from Kafka (it will consume
    #   the data)
    # @note This method needs bo be implemented in a subclass. We stub it here as a failover if
    #   someone forgets about it or makes on with typo
    def consume
      raise NotImplementedError, 'Implement this in a subclass'
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
karafka-1.1.2 lib/karafka/base_controller.rb
karafka-1.1.1 lib/karafka/base_controller.rb
karafka-1.1.0 lib/karafka/base_controller.rb