Sha256: 0621bde2cab10be137ac78901b3e25691d2b1374fd24ba9112f53fb639e87ab4

Contents?: true

Size: 910 Bytes

Versions: 12

Compression:

Stored size: 910 Bytes

Contents

# frozen_string_literal: true

module Karafka
  module ActiveJob
    # This is the consumer for ActiveJob that eats the messages enqueued with it one after another.
    # It marks the offset after each message, so we make sure, none of the jobs is executed twice
    class Consumer < ::Karafka::BaseConsumer
      # Executes the ActiveJob logic
      # @note ActiveJob does not support batches, so we just run one message after another
      def consume
        messages.each do |message|
          ::ActiveJob::Base.execute(
            # We technically speaking could set this as deserializer and reference it from the
            # message instead of using the `#raw_payload`. This is not done on purpose to simplify
            # the ActiveJob setup here
            ::ActiveSupport::JSON.decode(message.raw_payload)
          )

          mark_as_consumed(message)
        end
      end
    end
  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
karafka-2.0.0.rc2 lib/karafka/active_job/consumer.rb
karafka-2.0.0.rc1 lib/karafka/active_job/consumer.rb
karafka-2.0.0.beta5 lib/karafka/active_job/consumer.rb
karafka-2.0.0.beta4 lib/karafka/active_job/consumer.rb
karafka-2.0.0.beta3 lib/karafka/active_job/consumer.rb
karafka-2.0.0.beta2 lib/karafka/active_job/consumer.rb
karafka-2.0.0.beta1 lib/karafka/active_job/consumer.rb
karafka-2.0.0.alpha6 lib/karafka/active_job/consumer.rb
karafka-2.0.0.alpha5 lib/karafka/active_job/consumer.rb
karafka-2.0.0.alpha4 lib/karafka/active_job/consumer.rb
karafka-2.0.0.alpha3 lib/karafka/active_job/consumer.rb
karafka-2.0.0.alpha2 lib/karafka/active_job/consumer.rb