Sha256: d6c793199f9a66b6a2fa492f558465fcd5a6ac4ffffcec02ee9246814b28a16e

Contents?: true

Size: 1.91 KB

Versions: 39

Compression:

Stored size: 1.91 KB

Contents

require "kafka/protocol/message_set"

module Kafka
  module Protocol

    # A response to a fetch request.
    #
    # ## API Specification
    #
    #     FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
    #       TopicName => string
    #       Partition => int32
    #       ErrorCode => int16
    #       HighwaterMarkOffset => int64
    #       MessageSetSize => int32
    #
    class FetchResponse
      class FetchedPartition
        attr_reader :partition, :error_code
        attr_reader :highwater_mark_offset, :messages

        def initialize(partition:, error_code:, highwater_mark_offset:, messages:)
          @partition = partition
          @error_code = error_code
          @highwater_mark_offset = highwater_mark_offset
          @messages = messages
        end
      end

      class FetchedTopic
        attr_reader :name, :partitions

        def initialize(name:, partitions:)
          @name = name
          @partitions = partitions
        end
      end

      attr_reader :topics

      def initialize(topics: [])
        @topics = topics
      end

      def self.decode(decoder)
        topics = decoder.array do
          topic_name = decoder.string

          partitions = decoder.array do
            partition = decoder.int32
            error_code = decoder.int16
            highwater_mark_offset = decoder.int64

            message_set_decoder = Decoder.from_string(decoder.bytes)
            message_set = MessageSet.decode(message_set_decoder)

            FetchedPartition.new(
              partition: partition,
              error_code: error_code,
              highwater_mark_offset: highwater_mark_offset,
              messages: message_set.messages,
            )
          end

          FetchedTopic.new(
            name: topic_name,
            partitions: partitions,
          )
        end

        new(topics: topics)
      end
    end
  end
end

Version data entries

39 entries across 39 versions & 1 rubygems

Version Path
ruby-kafka-0.4.4 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.4.3 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.4.2 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.4.1 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.4.0 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.4.0.beta1 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.18.beta2 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.18.beta1 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.17 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.16 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.16.beta2 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.16.beta1 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.15 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.15.beta3 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.15.beta2 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.15.beta1 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.14 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.13.beta4 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.13.beta3 lib/kafka/protocol/fetch_response.rb
ruby-kafka-0.3.13.beta2 lib/kafka/protocol/fetch_response.rb