Sha256: df712d40eada5b77cd3d155dc1653a531281766986d6313ae7abafad26e3f001

Contents?: true

Size: 1.08 KB

Versions: 5

Compression:

Stored size: 1.08 KB

Contents

require "zlib"

module Kafka
  module Protocol

    # == API Specification
    #
    #     Message => Crc MagicByte Attributes Key Value
    #         Crc => int32
    #         MagicByte => int8
    #         Attributes => int8
    #         Key => bytes
    #         Value => bytes
    #
    class Message
      MAGIC_BYTE = 0

      attr_reader :key, :value, :attributes

      def initialize(key:, value:, attributes: 0)
        @key = key
        @value = value
        @attributes = attributes
      end

      def encode(encoder)
        data = encode_without_crc
        crc = Zlib.crc32(data)

        encoder.write_int32(crc)
        encoder.write(data)
      end

      def ==(other)
        @key == other.key && @value == other.value && @attributes == other.attributes
      end

      private

      def encode_without_crc
        buffer = StringIO.new
        encoder = Encoder.new(buffer)

        encoder.write_int8(MAGIC_BYTE)
        encoder.write_int8(@attributes)
        encoder.write_bytes(@key)
        encoder.write_bytes(@value)

        buffer.string
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
ruby-kafka-0.1.0.pre.beta3 lib/kafka/protocol/message.rb
ruby-kafka-0.1.0.pre.beta2 lib/kafka/protocol/message.rb
ruby-kafka-0.1.0.pre.beta1 lib/kafka/protocol/message.rb
ruby-kafka-0.1.0.pre.alpha2 lib/kafka/protocol/message.rb
ruby-kafka-0.1.0.pre.alpha lib/kafka/protocol/message.rb