Sha256: a32d01ad0a0e401cf27ddab8d7d1375e66d300cf7f783c8ee740095a6584f83c

Contents?: true

Size: 1.46 KB

Versions: 6

Compression:

Stored size: 1.46 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    module Strategies
      # No features enabled.
      # No manual offset management
      # No long running jobs
      # Nothing. Just standard, automatic flow
      module Default
        include Base

        # Apply strategy for a non-feature based flow
        FEATURES = %i[].freeze

        # No actions needed for the standard flow here
        def handle_before_enqueue
          nil
        end

        # Increment number of attempts
        def handle_before_consume
          coordinator.pause_tracker.increment
        end

        # Standard flow marks work as consumed and moves on if everything went ok.
        # If there was a processing error, we will pause and continue from the next message
        # (next that is +1 from the last one that was successfully marked as consumed)
        def handle_after_consume
          return if revoked?

          if coordinator.success?
            coordinator.pause_tracker.reset

            mark_as_consumed(messages.last)
          else
            pause(coordinator.seek_offset)
          end
        end

        # We need to always un-pause the processing in case we have lost a given partition.
        # Otherwise the underlying librdkafka would not know we may want to continue processing and
        # the pause could in theory last forever
        def handle_revoked
          resume

          coordinator.revoke
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
karafka-2.0.21 lib/karafka/processing/strategies/default.rb
karafka-2.0.20 lib/karafka/processing/strategies/default.rb
karafka-2.0.19 lib/karafka/processing/strategies/default.rb
karafka-2.0.18 lib/karafka/processing/strategies/default.rb
karafka-2.0.17 lib/karafka/processing/strategies/default.rb
karafka-2.0.16 lib/karafka/processing/strategies/default.rb