# frozen_string_literal: true # This Karafka component is a Pro component under a commercial license. # This Karafka component is NOT licensed under LGPL. # # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro module Processing # Pro coordinator that provides extra orchestration methods useful for parallel processing # within the same partition class Coordinator < ::Karafka::Processing::Coordinator extend Forwardable def_delegators :@collapser, :collapsed?, :collapse_until! attr_reader :filter, :virtual_offset_manager, :shared_mutex, :errors_tracker # @param args [Object] anything the base coordinator accepts def initialize(*args) super @executed = [] @errors_tracker = Coordinators::ErrorsTracker.new @flow_mutex = Mutex.new # Lock for user code synchronization # We do not want to mix coordinator lock with the user lock not to create cases where # user imposed lock would lock the internal operations of Karafka # This shared lock can be used by the end user as it is not used internally by the # framework and can be used for user-facing locking @shared_mutex = Mutex.new @collapser = Collapser.new @filter = Coordinators::FiltersApplier.new(self) return unless topic.virtual_partitions? @virtual_offset_manager = Coordinators::VirtualOffsetManager.new( topic.name, partition, topic.virtual_partitions.offset_metadata_strategy ) # We register our own "internal" filter to support filtering of messages that were marked # as consumed virtually @filter.filters << Filters::VirtualLimiter.new( @virtual_offset_manager, @collapser ) end # Starts the coordination process # @param messages [Array] messages for which processing we are # going to coordinate. def start(messages) super @collapser.refresh!(messages.first.offset) @filter.apply!(messages) # Do not clear coordinator errors storage when we are retrying, so we can reference the # errors that have happened during recovery. This can be useful for implementing custom # flows. There can be more errors than one when running with virtual partitions so we # need to make sure we collect them all. Under collapse when we reference a given # consumer we should be able to get all the errors and not just first/last. # # @note We use zero as the attempt mark because we are not "yet" in the attempt 1 @errors_tracker.clear if attempt.zero? @executed.clear # We keep the old processed offsets until the collapsing is done and regular processing # with virtualization is restored @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed? @last_message = messages.last end # Sets the consumer failure status and additionally starts the collapse until # # @param consumer [Karafka::BaseConsumer] consumer that failed # @param error [StandardError] error from the failure def failure!(consumer, error) super @errors_tracker << error collapse_until!(@last_message.offset + 1) end # @return [Boolean] did any of the filters apply any logic that would cause use to run # the filtering flow def filtered? @filter.applied? end # @return [Boolean] is the coordinated work finished or not # @note Used only in the consume operation context def finished? @running_jobs[:consume].zero? end # Runs synchronized code once for a collective of virtual partitions prior to work being # enqueued def on_enqueued @flow_mutex.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end # Runs given code only once per all the coordinated jobs upon starting first of them def on_started @flow_mutex.synchronize do return unless executable?(:on_started) yield(@last_message) end end # Runs given code once when all the work that is suppose to be coordinated is finished # It runs once per all the coordinated jobs and should be used to run any type of post # jobs coordination processing execution def on_finished @flow_mutex.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end # Runs once after a partition is revoked def on_revoked @flow_mutex.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end # @param interval [Integer] milliseconds of activity # @return [Boolean] was this partition in activity within last `interval` milliseconds # @note Will return true also if currently active def active_within?(interval) # its always active if there's any job related to this coordinator that is still # enqueued or running return true if @running_jobs.values.any?(:positive?) # Otherwise we check last time any job of this coordinator was active @changed_at + interval > monotonic_now end private # Checks if given action is executable once. If it is and true is returned, this method # will return false next time it is used. # # @param action [Symbol] what action we want to perform # @return [Boolean] true if we can # @note This method needs to run behind a mutex. def executable?(action) return false if @executed.include?(action) @executed << action true end end end end end