Sha256: 0ac54fb77e174c799a14db8246f7a156af17f5bd7800cecd5ede249fa71eb34f

Contents?: true

Size: 1.91 KB

Versions: 5

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

module Karafka
  module Connection
    # Buffer for messages.
    # When message is added to this buffer, it gets assigned to an array with other messages from
    # the same topic and partition.
    #
    # @note This buffer is NOT threadsafe.
    class MessagesBuffer
      attr_reader :size

      # @return [Karafka::Connection::MessagesBuffer] buffer instance
      def initialize
        @size = 0
        @groups = Hash.new do |topic_groups, topic|
          topic_groups[topic] = Hash.new do |partition_groups, partition|
            partition_groups[partition] = []
          end
        end
      end

      # Iterates over aggregated data providing messages per topic partition.
      #
      # @yieldparam [String] topic name
      # @yieldparam [Integer] partition number
      # @yieldparam [Array<Rdkafka::Consumer::Message>] topic partition aggregated results
      def each
        @groups.each do |topic, partitions|
          partitions.each do |partition, messages|
            yield(topic, partition, messages)
          end
        end
      end

      # Adds a message to the buffer.
      #
      # @param message [Rdkafka::Consumer::Message] raw rdkafka message
      # @return [Array<Rdkafka::Consumer::Message>] given partition topic sub-buffer array
      def <<(message)
        @size += 1
        @groups[message.topic][message.partition] << message
      end

      # Removes all the data from the buffer.
      #
      # @note We do not clear the whole groups hash but rather we clear the partition hashes, so
      #   we save ourselves some objects allocations. We cannot clear the underlying arrays as they
      #   may be used in other threads for data processing, thus if we would clear it, we could
      #   potentially clear a raw messages array for a job that is in the jobs queue.
      def clear
        @size = 0
        @groups.each_value(&:clear)
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
karafka-2.0.0.alpha5 lib/karafka/connection/messages_buffer.rb
karafka-2.0.0.alpha4 lib/karafka/connection/messages_buffer.rb
karafka-2.0.0.alpha3 lib/karafka/connection/messages_buffer.rb
karafka-2.0.0.alpha2 lib/karafka/connection/messages_buffer.rb
karafka-2.0.0.alpha1 lib/karafka/connection/messages_buffer.rb