Sha256: a79e64331eca5649c0fe87c4c26f9bbfaae92da90979a1532ef0104e135bc79f
Contents?: true
Size: 953 Bytes
Versions: 20
Compression:
Stored size: 953 Bytes
Contents
# frozen_string_literal: true module Karafka module ActiveJob # This is the consumer for ActiveJob that eats the messages enqueued with it one after another. # It marks the offset after each message, so we make sure, none of the jobs is executed twice class Consumer < ::Karafka::BaseConsumer # Executes the ActiveJob logic # @note ActiveJob does not support batches, so we just run one message after another def consume messages.each do |message| break if Karafka::App.stopping? ::ActiveJob::Base.execute( # We technically speaking could set this as deserializer and reference it from the # message instead of using the `#raw_payload`. This is not done on purpose to simplify # the ActiveJob setup here ::ActiveSupport::JSON.decode(message.raw_payload) ) mark_as_consumed(message) end end end end end
Version data entries
20 entries across 20 versions & 1 rubygems