Sha256: 2e437a5b60b2437ae2c7cc3e7850a918f88f2a861afdc09f84e9bd73a2a07102

Contents?: true

Size: 1.91 KB

Versions: 30

Compression:

Stored size: 1.91 KB

Contents

require "kafka/protocol/message"

module Kafka

  # Buffers messages for specific topics/partitions.
  class MessageBuffer
    include Enumerable

    attr_reader :size, :bytesize

    def initialize
      @buffer = {}
      @size = 0
      @bytesize = 0
    end

    def write(value:, key:, topic:, partition:, create_time: Time.now)
      message = Protocol::Message.new(key: key, value: value, create_time: create_time)

      buffer_for(topic, partition) << message

      @size += 1
      @bytesize += message.bytesize
    end

    def concat(messages, topic:, partition:)
      buffer_for(topic, partition).concat(messages)

      @size += messages.count
      @bytesize += messages.map(&:bytesize).reduce(0, :+)
    end

    def to_h
      @buffer
    end

    def empty?
      @buffer.empty?
    end

    def each
      @buffer.each do |topic, messages_for_topic|
        messages_for_topic.each do |partition, messages_for_partition|
          yield topic, partition, messages_for_partition
        end
      end
    end

    # Clears buffered messages for the given topic and partition.
    #
    # @param topic [String] the name of the topic.
    # @param partition [Integer] the partition id.
    #
    # @return [nil]
    def clear_messages(topic:, partition:)
      return unless @buffer.key?(topic) && @buffer[topic].key?(partition)

      @size -= @buffer[topic][partition].count
      @bytesize -= @buffer[topic][partition].map(&:bytesize).reduce(0, :+)

      @buffer[topic].delete(partition)
      @buffer.delete(topic) if @buffer[topic].empty?
    end

    def messages_for(topic:, partition:)
      buffer_for(topic, partition)
    end

    # Clears messages across all topics and partitions.
    #
    # @return [nil]
    def clear
      @buffer = {}
      @size = 0
      @bytesize = 0
    end

    private

    def buffer_for(topic, partition)
      @buffer[topic] ||= {}
      @buffer[topic][partition] ||= []
    end
  end
end

Version data entries

30 entries across 30 versions & 1 rubygems

Version Path
ruby-kafka-0.6.0.beta4 lib/kafka/message_buffer.rb
ruby-kafka-0.6.0.beta3 lib/kafka/message_buffer.rb
ruby-kafka-0.6.0.beta2 lib/kafka/message_buffer.rb
ruby-kafka-0.6.0.beta1 lib/kafka/message_buffer.rb
ruby-kafka-0.5.5 lib/kafka/message_buffer.rb
ruby-kafka-0.5.4 lib/kafka/message_buffer.rb
ruby-kafka-0.5.4.beta1 lib/kafka/message_buffer.rb
ruby-kafka-0.5.3 lib/kafka/message_buffer.rb
ruby-kafka-0.5.2 lib/kafka/message_buffer.rb
ruby-kafka-0.5.2.beta3 lib/kafka/message_buffer.rb
ruby-kafka-0.5.2.beta2 lib/kafka/message_buffer.rb
ruby-kafka-0.5.2.beta1 lib/kafka/message_buffer.rb
ruby-kafka-0.5.1 lib/kafka/message_buffer.rb
ruby-kafka-0.5.1.beta2 lib/kafka/message_buffer.rb
ruby-kafka-0.5.1.beta1 lib/kafka/message_buffer.rb
ruby-kafka-0.4.4 lib/kafka/message_buffer.rb
ruby-kafka-0.5.0 lib/kafka/message_buffer.rb
ruby-kafka-0.5.0.beta6 lib/kafka/message_buffer.rb
ruby-kafka-0.5.0.beta5 lib/kafka/message_buffer.rb
ruby-kafka-0.5.0.beta4 lib/kafka/message_buffer.rb