Sha256: 349493fcb0fec631c436d7ca4ae339047696d33e828b26125c5888db880b84f8
Contents?: true
Size: 1.08 KB
Versions: 1
Compression:
Stored size: 1.08 KB
Contents
module Kafka module Protocol class OffsetFetchResponse class PartitionOffsetInfo attr_reader :offset, :metadata, :error_code def initialize(offset:, metadata:, error_code:) @offset = offset @metadata = metadata @error_code = error_code end end attr_reader :topics def initialize(topics:) @topics = topics end def offset_for(topic, partition) offset_info = topics.fetch(topic).fetch(partition) Protocol.handle_error(offset_info.error_code) offset_info.offset end def self.decode(decoder) topics = decoder.array { topic = decoder.string partitions = decoder.array { partition = decoder.int32 info = PartitionOffsetInfo.new( offset: decoder.int64, metadata: decoder.string, error_code: decoder.int16, ) [partition, info] } [topic, Hash[partitions]] } new(topics: Hash[topics]) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
ruby-kafka-0.2.0 | lib/kafka/protocol/offset_fetch_response.rb |