lib/karafka/pro/processing/strategies/dlq_lrj.rb in karafka-2.0.22 vs lib/karafka/pro/processing/strategies/dlq_lrj.rb in karafka-2.0.23
- old
+ new
@@ -32,15 +32,17 @@
def handle_after_consume
coordinator.on_finished do |last_group_message|
if coordinator.success?
coordinator.pause_tracker.reset
+ return if coordinator.manual_pause?
+
mark_as_consumed(last_group_message) unless revoked?
seek(coordinator.seek_offset) unless revoked?
resume
elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
- pause(coordinator.seek_offset)
+ pause(coordinator.seek_offset, nil, false)
else
coordinator.pause_tracker.reset
unless revoked?
skippable_message = find_skippable_message