Sha256: 4ec0598a9cbcf671207dcb1f2784e0d2d0966203843c2a446b20b5ccedd85f50

Contents?: true

Size: 722 Bytes

Versions: 1

Compression:

Stored size: 722 Bytes

Contents

# frozen_string_literal: true

module Karafka
  # FIFO scheduler for messages coming from various topics and partitions
  class Scheduler
    # Yields messages from partitions in the fifo order
    #
    # @param messages_buffer [Karafka::Connection::MessagesBuffer] messages buffer with data from
    #   multiple topics and partitions
    # @yieldparam [String] topic name
    # @yieldparam [Integer] partition number
    # @yieldparam [Array<Rdkafka::Consumer::Message>] topic partition aggregated results
    def call(messages_buffer)
      messages_buffer.each do |topic, partitions|
        partitions.each do |partition, messages|
          yield(topic, partition, messages)
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-2.0.0.beta1 lib/karafka/scheduler.rb