Sha256: a32d01ad0a0e401cf27ddab8d7d1375e66d300cf7f783c8ee740095a6584f83c
Contents?: true
Size: 1.46 KB
Versions: 6
Compression:
Stored size: 1.46 KB
Contents
# frozen_string_literal: true module Karafka module Processing module Strategies # No features enabled. # No manual offset management # No long running jobs # Nothing. Just standard, automatic flow module Default include Base # Apply strategy for a non-feature based flow FEATURES = %i[].freeze # No actions needed for the standard flow here def handle_before_enqueue nil end # Increment number of attempts def handle_before_consume coordinator.pause_tracker.increment end # Standard flow marks work as consumed and moves on if everything went ok. # If there was a processing error, we will pause and continue from the next message # (next that is +1 from the last one that was successfully marked as consumed) def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset mark_as_consumed(messages.last) else pause(coordinator.seek_offset) end end # We need to always un-pause the processing in case we have lost a given partition. # Otherwise the underlying librdkafka would not know we may want to continue processing and # the pause could in theory last forever def handle_revoked resume coordinator.revoke end end end end end
Version data entries
6 entries across 6 versions & 1 rubygems