Sha256: c76c1383432e6a7ac9c788ab01482ccc6daf6db14d1c392f211b469b0b0c2eec

Contents?: true

Size: 1.3 KB

Versions: 33

Compression:

Stored size: 1.3 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    module Jobs
      # The main job type. It runs the executor that triggers given topic partition messages
      # processing in an underlying consumer instance.
      class Consume < Base
        # @return [Array<Rdkafka::Consumer::Message>] array with messages
        attr_reader :messages

        # @param executor [Karafka::Processing::Executor] executor that is suppose to run a given
        #   job
        # @param messages [Karafka::Messages::Messages] karafka messages batch
        # @return [Consume]
        def initialize(executor, messages)
          @executor = executor
          @messages = messages
          super()
        end

        # Runs all the preparation code on the executor that needs to happen before the job is
        # enqueued.
        def before_enqueue
          executor.before_enqueue(@messages)
        end

        # Runs the before consumption preparations on the executor
        def before_call
          executor.before_consume
        end

        # Runs the given executor
        def call
          executor.consume
        end

        # Runs any error handling and other post-consumption stuff on the executor
        def after_call
          executor.after_consume
        end
      end
    end
  end
end

Version data entries

33 entries across 33 versions & 1 rubygems

Version Path
karafka-2.2.13 lib/karafka/processing/jobs/consume.rb
karafka-2.2.12 lib/karafka/processing/jobs/consume.rb
karafka-2.2.11 lib/karafka/processing/jobs/consume.rb
karafka-2.2.10 lib/karafka/processing/jobs/consume.rb
karafka-2.2.9 lib/karafka/processing/jobs/consume.rb
karafka-2.2.8 lib/karafka/processing/jobs/consume.rb
karafka-2.2.8.beta1 lib/karafka/processing/jobs/consume.rb
karafka-2.2.7 lib/karafka/processing/jobs/consume.rb
karafka-2.2.6 lib/karafka/processing/jobs/consume.rb
karafka-2.2.5 lib/karafka/processing/jobs/consume.rb
karafka-2.2.4 lib/karafka/processing/jobs/consume.rb
karafka-2.2.3 lib/karafka/processing/jobs/consume.rb
karafka-2.2.2 lib/karafka/processing/jobs/consume.rb
karafka-2.2.1 lib/karafka/processing/jobs/consume.rb
karafka-2.2.0 lib/karafka/processing/jobs/consume.rb
karafka-2.1.13 lib/karafka/processing/jobs/consume.rb
karafka-2.1.12 lib/karafka/processing/jobs/consume.rb
karafka-2.1.11 lib/karafka/processing/jobs/consume.rb
karafka-2.1.10 lib/karafka/processing/jobs/consume.rb
karafka-2.1.9 lib/karafka/processing/jobs/consume.rb