Sha256: 47314f78f5826275371c4da4525c704ec8bbf923016ef032c84d3665054da6f0

Contents?: true

Size: 1.3 KB

Versions: 13

Compression:

Stored size: 1.3 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    module Jobs
      # The main job type. It runs the executor that triggers given topic partition messages
      # processing in an underlying consumer instance.
      class Consume < Base
        # @return [Array<Rdkafka::Consumer::Message>] array with messages
        attr_reader :messages

        # @param executor [Karafka::Processing::Executor] executor that is suppose to run a given
        #   job
        # @param messages [Karafka::Messages::Messages] karafka messages batch
        # @param coordinator [Karafka::Processing::Coordinator] processing coordinator
        # @return [Consume]
        def initialize(executor, messages, coordinator)
          @executor = executor
          @messages = messages
          @coordinator = coordinator
          @created_at = Time.now
          super()
        end

        # Runs the before consumption preparations on the executor
        def before_call
          executor.before_consume(@messages, @created_at, @coordinator)
        end

        # Runs the given executor
        def call
          executor.consume
        end

        # Runs any error handling and other post-consumption stuff on the executor
        def after_call
          executor.after_consume
        end
      end
    end
  end
end

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
karafka-2.0.5 lib/karafka/processing/jobs/consume.rb
karafka-2.0.4 lib/karafka/processing/jobs/consume.rb
karafka-2.0.3 lib/karafka/processing/jobs/consume.rb
karafka-2.0.2 lib/karafka/processing/jobs/consume.rb
karafka-2.0.1 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc6 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc5 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc4 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc3 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc2 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.rc1 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.beta5 lib/karafka/processing/jobs/consume.rb