Sha256: a79e64331eca5649c0fe87c4c26f9bbfaae92da90979a1532ef0104e135bc79f

Contents?: true

Size: 953 Bytes

Versions: 20

Compression:

Stored size: 953 Bytes

Contents

# frozen_string_literal: true

module Karafka
  module ActiveJob
    # This is the consumer for ActiveJob that eats the messages enqueued with it one after another.
    # It marks the offset after each message, so we make sure, none of the jobs is executed twice
    class Consumer < ::Karafka::BaseConsumer
      # Executes the ActiveJob logic
      # @note ActiveJob does not support batches, so we just run one message after another
      def consume
        messages.each do |message|
          break if Karafka::App.stopping?

          ::ActiveJob::Base.execute(
            # We technically speaking could set this as deserializer and reference it from the
            # message instead of using the `#raw_payload`. This is not done on purpose to simplify
            # the ActiveJob setup here
            ::ActiveSupport::JSON.decode(message.raw_payload)
          )

          mark_as_consumed(message)
        end
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
karafka-2.0.15 lib/karafka/active_job/consumer.rb
karafka-2.0.14 lib/karafka/active_job/consumer.rb
karafka-2.0.13 lib/karafka/active_job/consumer.rb
karafka-2.0.12 lib/karafka/active_job/consumer.rb
karafka-2.0.11 lib/karafka/active_job/consumer.rb
karafka-2.0.10 lib/karafka/active_job/consumer.rb
karafka-2.0.9 lib/karafka/active_job/consumer.rb
karafka-2.0.8 lib/karafka/active_job/consumer.rb
karafka-2.0.7 lib/karafka/active_job/consumer.rb
karafka-2.0.6 lib/karafka/active_job/consumer.rb
karafka-2.0.5 lib/karafka/active_job/consumer.rb
karafka-2.0.4 lib/karafka/active_job/consumer.rb
karafka-2.0.3 lib/karafka/active_job/consumer.rb
karafka-2.0.2 lib/karafka/active_job/consumer.rb
karafka-2.0.1 lib/karafka/active_job/consumer.rb
karafka-2.0.0 lib/karafka/active_job/consumer.rb
karafka-2.0.0.rc6 lib/karafka/active_job/consumer.rb
karafka-2.0.0.rc5 lib/karafka/active_job/consumer.rb
karafka-2.0.0.rc4 lib/karafka/active_job/consumer.rb
karafka-2.0.0.rc3 lib/karafka/active_job/consumer.rb