# frozen_string_literal: true # This Karafka component is a Pro component. # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro # Karafka PRO consumer. # # If you use PRO, all your consumers should inherit (indirectly) from it. # # @note In case of using lrj, manual pausing may not be the best idea as resume needs to happen # after each batch is processed. class BaseConsumer < Karafka::BaseConsumer # Pause for tops 31 years MAX_PAUSE_TIME = 1_000_000_000_000 private_constant :MAX_PAUSE_TIME # Pauses processing of a given partition until we're done with the processing # This ensures, that we can easily poll not reaching the `max.poll.interval` def on_before_consume return unless topic.long_running_job? # This ensures, that when running LRJ with VP, things operate as expected coordinator.on_started do |first_group_message| # Pause at the first message in a batch. That way in case of a crash, we will not loose # any messages pause(first_group_message.offset, MAX_PAUSE_TIME) end end # Runs extra logic after consumption that is related to handling long-running jobs # @note This overwrites the '#on_after_consume' from the base consumer def on_after_consume coordinator.on_finished do |first_group_message, last_group_message| on_after_consume_regular(first_group_message, last_group_message) end end # Trigger method for running on partition revocation. # # @private def on_revoked # We do not want to resume on revocation in case of a LRJ. # For LRJ we resume after the successful processing or do a backoff pause in case of a # failure. Double non-blocking resume could cause problems in coordination. resume unless topic.long_running_job? coordinator.revoke Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end private # Handles the post-consumption flow depending on topic settings # # @param first_message [Karafka::Messages::Message] # @param last_message [Karafka::Messages::Message] def on_after_consume_regular(first_message, last_message) if coordinator.success? coordinator.pause_tracker.reset # We use the non-blocking one here. If someone needs the blocking one, can implement it # with manual offset management # Mark as consumed only if manual offset management is not on mark_as_consumed(last_message) unless topic.manual_offset_management? || revoked? # If this is not a long-running job there is nothing for us to do here return unless topic.long_running_job? # Once processing is done, we move to the new offset based on commits # Here, in case manual offset management is off, we have the new proper offset of a # first message from another batch from `@seek_offset`. If manual offset management # is on, we move to place where the user indicated it was finished. This can create an # interesting (yet valid) corner case, where with manual offset management on and no # marking as consumed, we end up with an infinite loop processing same messages over and # over again seek(@seek_offset || first_message.offset) resume else # If processing failed, we need to pause # For long running job this will overwrite the default never-ending pause and will cause # the processing th keep going after the error backoff pause(@seek_offset || first_message.offset) end end end end end