Sha256: 4ec0598a9cbcf671207dcb1f2784e0d2d0966203843c2a446b20b5ccedd85f50
Contents?: true
Size: 722 Bytes
Versions: 1
Compression:
Stored size: 722 Bytes
Contents
# frozen_string_literal: true module Karafka # FIFO scheduler for messages coming from various topics and partitions class Scheduler # Yields messages from partitions in the fifo order # # @param messages_buffer [Karafka::Connection::MessagesBuffer] messages buffer with data from # multiple topics and partitions # @yieldparam [String] topic name # @yieldparam [Integer] partition number # @yieldparam [Array<Rdkafka::Consumer::Message>] topic partition aggregated results def call(messages_buffer) messages_buffer.each do |topic, partitions| partitions.each do |partition, messages| yield(topic, partition, messages) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
karafka-2.0.0.beta1 | lib/karafka/scheduler.rb |