Sha256: bb585e73abca33645495ade81958fabdef6162b71f1ca84ce9191fe6a8b736d9
Contents?: true
Size: 1.15 KB
Versions: 55
Compression:
Stored size: 1.15 KB
Contents
module Kafka module Protocol class OffsetFetchResponse class PartitionOffsetInfo attr_reader :offset, :metadata, :error_code def initialize(offset:, metadata:, error_code:) @offset = offset @metadata = metadata @error_code = error_code end end attr_reader :topics def initialize(topics:) @topics = topics end def offset_for(topic, partition) offset_info = topics.fetch(topic).fetch(partition, nil) if offset_info Protocol.handle_error(offset_info.error_code) offset_info.offset else -1 end end def self.decode(decoder) topics = decoder.array { topic = decoder.string partitions = decoder.array { partition = decoder.int32 info = PartitionOffsetInfo.new( offset: decoder.int64, metadata: decoder.string, error_code: decoder.int16, ) [partition, info] } [topic, Hash[partitions]] } new(topics: Hash[topics]) end end end end
Version data entries
55 entries across 55 versions & 1 rubygems