# frozen_string_literal: true # This Karafka component is a Pro component under a commercial license. # This Karafka component is NOT licensed under LGPL. # # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro module Processing module Strategies # VP starting strategies module Vp # Just Virtual Partitions enabled module Default # This flow is exactly the same as the default one because the default one is wrapper # with `coordinator#on_finished` include Strategies::Default # Features for this strategy FEATURES = %i[ virtual_partitions ].freeze # @param message [Karafka::Messages::Message] marks message as consumed # @param offset_metadata [String, nil] # @note This virtual offset management uses a regular default marking API underneath. # We do not alter the "real" marking API, as VPs are just one of many cases we want # to support and we do not want to impact them with collective offsets management def mark_as_consumed(message, offset_metadata = @_current_offset_metadata) if @_in_transaction && !collapsed? mark_in_transaction(message, offset_metadata, true) elsif collapsed? super else manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark(message, offset_metadata) # If this is last marking on a finished flow, we can use the original # last message and in order to do so, we need to mark all previous messages as # consumed as otherwise the computed offset could be different # We mark until our offset just in case of a DLQ flow or similar, where we do not # want to mark all but until the expected location manager.mark_until(message, offset_metadata) if coordinator.finished? return revoked? unless manager.markable? manager.markable? ? super(*manager.markable) : revoked? end end ensure @_current_offset_metadata = nil end # @param message [Karafka::Messages::Message] blocking marks message as consumed # @param offset_metadata [String, nil] def mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) if @_in_transaction && !collapsed? mark_in_transaction(message, offset_metadata, false) elsif collapsed? super else manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark(message, offset_metadata) manager.mark_until(message, offset_metadata) if coordinator.finished? manager.markable? ? super(*manager.markable) : revoked? end end ensure @_current_offset_metadata = nil end # Stores the next offset for processing inside of the transaction when collapsed and # accumulates marking as consumed in the local buffer. # # Due to nature of VPs we cannot provide full EOS support but we can simulate it, # making sure that no offset are stored unless transaction is finished. We do it by # accumulating the post-transaction marking requests and after it is successfully done # we mark each as consumed. This effectively on errors "rollbacks" the state and # prevents offset storage. # # Since the EOS here is "weak", we do not have to worry about the race-conditions and # we do not have to have any mutexes. # # @param message [Messages::Message] message we want to commit inside of a transaction # @param offset_metadata [String, nil] offset metadata or nil if none # @param async [Boolean] should we mark in async or sync way (applicable only to post # transaction state synchronization usage as within transaction it is always sync) def mark_in_transaction(message, offset_metadata, async) raise Errors::TransactionRequiredError unless @_in_transaction # Prevent from attempts of offset storage when we no longer own the assignment raise Errors::AssignmentLostError if revoked? return super if collapsed? @_transaction_marked << [message, offset_metadata, async] end # @return [Boolean] is the virtual processing collapsed in the context of given # consumer. def collapsed? coordinator.collapsed? end # @param offset [Integer] first offset from which we should not operate in a collapsed # mode. # @note Keep in mind, that if a batch contains this but also messages earlier messages # that should be collapsed, all will continue to operate in a collapsed mode until # first full batch with only messages that should not be collapsed. def collapse_until!(offset) coordinator.collapse_until!(offset) end # @return [Boolean] true if any of virtual partition we're operating in the entangled # mode has already failed and we know we are failing collectively. # Useful for early stop to minimize number of things processed twice. # # @note We've named it `#failing?` instead of `#failure?` because it aims to be used # from within virtual partitions where we want to have notion of collective failing # not just "local" to our processing. We "are" failing with other virtual partitions # raising an error, but locally we are still processing. def failing? coordinator.failure? end # Allows for cross-virtual-partition consumers locks # # This is not needed in the non-VP flows except LRJ because there is always only one # consumer per partition at the same time, so no coordination is needed directly for # the end users. With LRJ it is needed and provided in the `LRJ::Default` strategy, # because lifecycle events on revocation can run in parallel to the LRJ job as it is # non-blocking. # # @param block [Proc] block we want to run in a mutex to prevent race-conditions def synchronize(&block) coordinator.shared_mutex.synchronize(&block) end private # Prior to adding work to the queue, registers all the messages offsets into the # virtual offset group. # # @note This can be done without the mutex, because it happens from the same thread # for all the work (listener thread) def handle_before_schedule_consume super # We should not register offsets in virtual manager when in collapse as virtual # manager is not used then for offsets materialization. # # If we would do so, it would cause increased storage in cases of endless errors # that are being retried in collapse without a DLQ. return if collapsed? coordinator.virtual_offset_manager.register( messages.map(&:offset) ) end end end end end end end