Sha256: d7a85a24498a4f6fa151ad284e56def94f98e2198545dfe4de085df991a30f48

Contents?: true

Size: 1.79 KB

Versions: 27

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

# Karafka module namespace
module Karafka
  # Base consumer from which all Karafka consumers should inherit
  class BaseConsumer
    extend Forwardable

    # Allows us to mark messages as consumed for non-automatic mode without having
    # to use consumer client directly. We do this that way, because most of the people should not
    # mess with the client instance directly (just in case)
    %i[
      mark_as_consumed
      mark_as_consumed!
      trigger_heartbeat
      trigger_heartbeat!
    ].each do |delegated_method_name|
      def_delegator :client, delegated_method_name

      private delegated_method_name
    end

    # @return [Karafka::Routing::Topic] topic to which a given consumer is subscribed
    attr_reader :topic
    # @return [Karafka::Params:ParamsBatch] current params batch
    attr_accessor :params_batch

    # Assigns a topic to a consumer and builds up proper consumer functionalities
    #   so that it can cooperate with the topic settings
    # @param topic [Karafka::Routing::Topic]
    def initialize(topic)
      @topic = topic
      Consumers::Includer.call(self)
    end

    # Executes the default consumer flow.
    def call
      process
    end

    private

    # @return [Karafka::Connection::Client] messages consuming client that can be used to
    #    commit manually offset or pause / stop consumer based on the business logic
    def client
      Persistence::Client.read
    end

    # Method that will perform business logic and on data received from Kafka (it will consume
    #   the data)
    # @note This method needs bo be implemented in a subclass. We stub it here as a failover if
    #   someone forgets about it or makes on with typo
    def consume
      raise NotImplementedError, 'Implement this in a subclass'
    end
  end
end

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/base_consumer.rb
karafka-1.4.14 lib/karafka/base_consumer.rb
karafka-1.4.13 lib/karafka/base_consumer.rb
karafka-1.4.12 lib/karafka/base_consumer.rb
karafka-1.4.11 lib/karafka/base_consumer.rb
karafka-1.4.10 lib/karafka/base_consumer.rb
karafka-1.4.9 lib/karafka/base_consumer.rb
karafka-1.4.8 lib/karafka/base_consumer.rb
karafka-1.4.7 lib/karafka/base_consumer.rb
karafka-1.4.6 lib/karafka/base_consumer.rb
karafka-1.4.5 lib/karafka/base_consumer.rb
karafka-1.4.4 lib/karafka/base_consumer.rb
karafka-1.4.3 lib/karafka/base_consumer.rb
karafka-1.4.2 lib/karafka/base_consumer.rb
karafka-1.4.1 lib/karafka/base_consumer.rb
karafka-1.4.0 lib/karafka/base_consumer.rb
karafka-1.4.0.rc2 lib/karafka/base_consumer.rb
karafka-1.4.0.rc1 lib/karafka/base_consumer.rb
karafka-1.3.7 lib/karafka/base_consumer.rb
karafka-1.3.6 lib/karafka/base_consumer.rb