# frozen_string_literal: true

require "stringio"

module Kafka
  module Protocol

    # A produce request sends a message set to the server.
    #
    # ## API Specification
    #
    #     ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
    #         RequiredAcks => int16
    #         Timeout => int32
    #         Partition => int32
    #         MessageSetSize => int32
    #
    #     MessageSet => [Offset MessageSize Message]
    #         Offset => int64
    #         MessageSize => int32
    #
    #     Message => Crc MagicByte Attributes Key Value
    #         Crc => int32
    #         MagicByte => int8
    #         Attributes => int8
    #         Key => bytes
    #         Value => bytes
    #
    class ProduceRequest
      attr_reader :required_acks, :timeout, :messages_for_topics

      # @param required_acks [Integer]
      # @param timeout [Integer]
      # @param messages_for_topics [Hash]
      def initialize(required_acks:, timeout:, messages_for_topics:)
        @required_acks = required_acks
        @timeout = timeout
        @messages_for_topics = messages_for_topics
      end

      def api_key
        PRODUCE_API
      end

      def api_version
        2
      end

      def response_class
        requires_acks? ? Protocol::ProduceResponse : nil
      end

      # Whether this request requires any acknowledgements at all. If no acknowledgements
      # are required, the server will not send back a response at all.
      #
      # @return [Boolean] true if acknowledgements are required, false otherwise.
      def requires_acks?
        @required_acks != 0
      end

      def encode(encoder)
        encoder.write_int16(@required_acks)
        encoder.write_int32(@timeout)

        encoder.write_array(@messages_for_topics) do |topic, messages_for_partition|
          encoder.write_string(topic)

          encoder.write_array(messages_for_partition) do |partition, message_set|
            encoder.write_int32(partition)

            # When encoding the message set into the request, the bytesize of the message
            # set must precede the actual data. Therefore we need to encode the entire
            # message set into a separate buffer first.
            encoded_message_set = Encoder.encode_with(message_set)

            encoder.write_bytes(encoded_message_set)
          end
        end
      end
    end
  end
end