Sha256: 0621bde2cab10be137ac78901b3e25691d2b1374fd24ba9112f53fb639e87ab4
Contents?: true
Size: 910 Bytes
Versions: 12
Compression:
Stored size: 910 Bytes
Contents
# frozen_string_literal: true module Karafka module ActiveJob # This is the consumer for ActiveJob that eats the messages enqueued with it one after another. # It marks the offset after each message, so we make sure, none of the jobs is executed twice class Consumer < ::Karafka::BaseConsumer # Executes the ActiveJob logic # @note ActiveJob does not support batches, so we just run one message after another def consume messages.each do |message| ::ActiveJob::Base.execute( # We technically speaking could set this as deserializer and reference it from the # message instead of using the `#raw_payload`. This is not done on purpose to simplify # the ActiveJob setup here ::ActiveSupport::JSON.decode(message.raw_payload) ) mark_as_consumed(message) end end end end end
Version data entries
12 entries across 12 versions & 1 rubygems