Sha256: cb5fc6dd1c475f85ddf2f2bcd5dde66953fd0a78bc3b98fb0baf9f4b86938738
Contents?: true
Size: 1.96 KB
Versions: 3
Compression:
Stored size: 1.96 KB
Contents
# frozen_string_literal: true # Karafka module namespace module Karafka # Base controller from which all Karafka controllers should inherit class BaseController extend ActiveSupport::DescendantsTracker class << self attr_reader :topic # Assigns a topic to a controller and build up proper controller functionalities, so it can # cooperate with the topic settings # @param topic [Karafka::Routing::Topic] # @return [Karafka::Routing::Topic] assigned topic def topic=(topic) @topic = topic Controllers::Includer.call(self) end end # @return [Karafka::Routing::Topic] topic to which a given controller is subscribed def topic self.class.topic end # Creates lazy loaded params batch object # @note Until first params usage, it won't parse data at all # @param messages [Array<Kafka::FetchedMessage>, Array<Hash>] messages with raw # content (from Kafka) or messages inside a hash (from backend, etc) # @return [Karafka::Params::ParamsBatch] lazy loaded params batch def params_batch=(messages) @params_batch = Karafka::Params::ParamsBatch.new(messages, topic.parser) end # Executes the default controller flow. def call process end private # We make it private as it should be accesible only from the inside of a controller attr_reader :params_batch # @return [Karafka::Connection::Consumer] messages consumer that can be used to # commit manually offset or pause / stop consumer based on the business logic def consumer Persistence::Consumer.read end # Method that will perform business logic and on data received from Kafka (it will consume # the data) # @note This method needs bo be implemented in a subclass. We stub it here as a failover if # someone forgets about it or makes on with typo def consume raise NotImplementedError, 'Implement this in a subclass' end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
karafka-1.1.2 | lib/karafka/base_controller.rb |
karafka-1.1.1 | lib/karafka/base_controller.rb |
karafka-1.1.0 | lib/karafka/base_controller.rb |