Sha256: d6c793199f9a66b6a2fa492f558465fcd5a6ac4ffffcec02ee9246814b28a16e
Contents?: true
Size: 1.91 KB
Versions: 39
Compression:
Stored size: 1.91 KB
Contents
require "kafka/protocol/message_set" module Kafka module Protocol # A response to a fetch request. # # ## API Specification # # FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] # TopicName => string # Partition => int32 # ErrorCode => int16 # HighwaterMarkOffset => int64 # MessageSetSize => int32 # class FetchResponse class FetchedPartition attr_reader :partition, :error_code attr_reader :highwater_mark_offset, :messages def initialize(partition:, error_code:, highwater_mark_offset:, messages:) @partition = partition @error_code = error_code @highwater_mark_offset = highwater_mark_offset @messages = messages end end class FetchedTopic attr_reader :name, :partitions def initialize(name:, partitions:) @name = name @partitions = partitions end end attr_reader :topics def initialize(topics: []) @topics = topics end def self.decode(decoder) topics = decoder.array do topic_name = decoder.string partitions = decoder.array do partition = decoder.int32 error_code = decoder.int16 highwater_mark_offset = decoder.int64 message_set_decoder = Decoder.from_string(decoder.bytes) message_set = MessageSet.decode(message_set_decoder) FetchedPartition.new( partition: partition, error_code: error_code, highwater_mark_offset: highwater_mark_offset, messages: message_set.messages, ) end FetchedTopic.new( name: topic_name, partitions: partitions, ) end new(topics: topics) end end end end
Version data entries
39 entries across 39 versions & 1 rubygems