Sha256: f3e445efa620f35aa471bf4bd2164c276bca8e7b06252d21247d2fb7814cbfaa

Contents?: true

Size: 1.98 KB

Versions: 23

Compression:

Stored size: 1.98 KB

Contents

# frozen_string_literal: true

# This Karafka component is a Pro component.
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    module Processing
      # Optimizes scheduler that takes into consideration of execution time needed to process
      # messages from given topics partitions. It uses the non-preemptive LJF algorithm
      #
      # This scheduler is designed to optimize execution times on jobs that perform IO operations
      # as when taking IO into consideration, the can achieve optimized parallel processing.
      #
      # This scheduler can also work with virtual partitions.
      #
      # Aside from consumption jobs, other jobs do not run often, thus we can leave them with
      # default FIFO scheduler from the default Karafka scheduler
      class Scheduler < ::Karafka::Processing::Scheduler
        # Schedules jobs in the LJF order for consumption
        #
        # @param queue [Karafka::Processing::JobsQueue] queue where we want to put the jobs
        # @param jobs_array [Array<Karafka::Processing::Jobs::Base>] jobs we want to schedule
        #
        def schedule_consumption(queue, jobs_array)
          pt = PerformanceTracker.instance

          ordered = []

          jobs_array.each do |job|
            messages = job.messages
            message = messages.first

            cost = pt.processing_time_p95(message.topic, message.partition) * messages.size

            ordered << [job, cost]
          end

          ordered.sort_by!(&:last)
          ordered.reverse!
          ordered.map!(&:first)

          ordered.each do |job|
            queue << job
          end
        end
      end
    end
  end
end

Version data entries

23 entries across 23 versions & 1 rubygems

Version Path
karafka-2.0.0.rc2 lib/karafka/pro/processing/scheduler.rb
karafka-2.0.0.rc1 lib/karafka/pro/processing/scheduler.rb
karafka-2.0.0.beta5 lib/karafka/pro/processing/scheduler.rb