Sha256: bc0283c9cdfc519725a7de2c2a0c708202a21c22a0540cf9038c3d07c52e76fe

Contents?: true

Size: 957 Bytes

Versions: 1

Compression:

Stored size: 957 Bytes

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    module Jobs
      # The main job type. It runs the executor that triggers given topic partition messages
      # processing in an underlying consumer instance.
      class Consume < Base
        # @param executor [Karafka::Processing::Executor] executor that is suppose to run a given
        #   job
        # @param messages [Array<dkafka::Consumer::Message>] array with raw rdkafka messages with
        #   which we are suppose to work
        # @return [Consume]
        def initialize(executor, messages)
          @executor = executor
          @messages = messages
          @created_at = Time.now
          super()
        end

        # Runs the preparations on the executor
        def prepare
          executor.prepare(@messages, @created_at)
        end

        # Runs the given executor
        def call
          executor.consume
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-2.0.0.beta1 lib/karafka/processing/jobs/consume.rb