Sha256: aae5c98108cbcd4eaa69f629eb81a574f6ebf25ad8057bfe51b44bf7dbe74dfb
Contents?: true
Size: 1.18 KB
Versions: 49
Compression:
Stored size: 1.18 KB
Contents
# frozen_string_literal: true module Kafka module Protocol class OffsetFetchResponse class PartitionOffsetInfo attr_reader :offset, :metadata, :error_code def initialize(offset:, metadata:, error_code:) @offset = offset @metadata = metadata @error_code = error_code end end attr_reader :topics def initialize(topics:) @topics = topics end def offset_for(topic, partition) offset_info = topics.fetch(topic).fetch(partition, nil) if offset_info Protocol.handle_error(offset_info.error_code) offset_info.offset else -1 end end def self.decode(decoder) topics = decoder.array { topic = decoder.string partitions = decoder.array { partition = decoder.int32 info = PartitionOffsetInfo.new( offset: decoder.int64, metadata: decoder.string, error_code: decoder.int16, ) [partition, info] } [topic, Hash[partitions]] } new(topics: Hash[topics]) end end end end
Version data entries
49 entries across 49 versions & 4 rubygems