Sha256: f71f2ceba6350178cea948ce97fc31151487e2cafc1841deb9fac0d5594e4a08

Contents?: true

Size: 1.81 KB

Versions: 20

Compression:

Stored size: 1.81 KB

Contents

module Rdkafka
  class Consumer
    # A message headers
    class Headers
      # Reads a native kafka's message header into ruby's hash
      #
      # @return [Hash<String, String>] a message headers
      #
      # @raise [Rdkafka::RdkafkaError] when fail to read headers
      #
      # @private
      def self.from_native(native_message)
        headers_ptrptr = FFI::MemoryPointer.new(:pointer)
        err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr)

        if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT
          return {}
        elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
          raise Rdkafka::RdkafkaError.new(err, "Error reading message headers")
        end

        headers_ptr = headers_ptrptr.read_pointer

        name_ptrptr = FFI::MemoryPointer.new(:pointer)
        value_ptrptr = FFI::MemoryPointer.new(:pointer)
        size_ptr = Rdkafka::Bindings::SizePtr.new
        headers = {}

        idx = 0
        loop do
          err = Rdkafka::Bindings.rd_kafka_header_get_all(
            headers_ptr,
            idx,
            name_ptrptr,
            value_ptrptr,
            size_ptr
          )

          if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT
            break
          elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
            raise Rdkafka::RdkafkaError.new(err, "Error reading a message header at index #{idx}")
          end

          name_ptr = name_ptrptr.read_pointer
          name = name_ptr.respond_to?(:read_string_to_null) ? name_ptr.read_string_to_null : name_ptr.read_string

          size = size_ptr[:value]

          value_ptr = value_ptrptr.read_pointer

          value = value_ptr.read_string(size)

          headers[name.to_sym] = value

          idx += 1
        end

        headers
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 2 rubygems

Version Path
rdkafka-0.12.1 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.4 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.3 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.2 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.1 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.1.beta1 lib/rdkafka/consumer/headers.rb
karafka-rdkafka-0.12.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0.beta.4 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0.beta.3 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0.beta.2 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0.beta.1 lib/rdkafka/consumer/headers.rb
rdkafka-0.12.0.beta.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.11.1 lib/rdkafka/consumer/headers.rb
rdkafka-0.11.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.10.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.9.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.8.1 lib/rdkafka/consumer/headers.rb
rdkafka-0.8.0 lib/rdkafka/consumer/headers.rb
rdkafka-0.8.0.beta.1 lib/rdkafka/consumer/headers.rb