Sha256: 35278c44242e769ea2ca8aed1bd30e3afe5842806ceb6057d7eecb9941ffb7c7

Contents?: true

Size: 850 Bytes

Versions: 6

Compression:

Stored size: 850 Bytes

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    module Jobs
      # The main job type. It runs the executor that triggers given topic partition messages
      # processing in an underlying consumer instance.
      class Consume < Base
        # @param executor [Karafka::Processing::Executor] executor that is suppose to run a given
        #   job
        # @param messages [Array<dkafka::Consumer::Message>] array with raw rdkafka messages with
        #   which we are suppose to work
        # @return [Consume]
        def initialize(executor, messages)
          @executor = executor
          @messages = messages
          @created_at = Time.now
          super()
        end

        # Runs the given executor.
        def call
          executor.consume(@messages, @created_at)
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
karafka-2.0.0.alpha6 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.alpha5 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.alpha4 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.alpha3 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.alpha2 lib/karafka/processing/jobs/consume.rb
karafka-2.0.0.alpha1 lib/karafka/processing/jobs/consume.rb