Sha256: 93e66f6644e0127555c095f51339498973ab5b8ab3764d4e1b846c3fd856a847
Contents?: true
Size: 1.84 KB
Versions: 3
Compression:
Stored size: 1.84 KB
Contents
module Rdkafka class Consumer # A message headers class Headers # Reads a native kafka's message header into ruby's hash # # @return [Hash<String, String>] a message headers # # @raise [Rdkafka::RdkafkaError] when fail to read headers # # @private def self.from_native(native_message) headers_ptrptr = FFI::MemoryPointer.new(:pointer) err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr) if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT return {} elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR raise Rdkafka::RdkafkaError.new(err, "Error reading message headers") end headers_ptr = headers_ptrptr.read(:pointer).tap { |it| it.autorelease = false } name_ptrptr = FFI::MemoryPointer.new(:pointer) value_ptrptr = FFI::MemoryPointer.new(:pointer) size_ptr = Rdkafka::Bindings::SizePtr.new headers = {} idx = 0 loop do err = Rdkafka::Bindings.rd_kafka_header_get_all( headers_ptr, idx, name_ptrptr, value_ptrptr, size_ptr ) if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT break elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR raise Rdkafka::RdkafkaError.new(err, "Error reading a message header at index #{idx}") end name = name_ptrptr.read(:pointer).tap { |it| it.autorelease = false } name = name.read_string_to_null size = size_ptr[:value] value = value_ptrptr.read(:pointer).tap { |it| it.autorelease = false } value = value.read_string(size) headers[name.to_sym] = value idx += 1 end headers end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
rdkafka-0.7.0 | lib/rdkafka/consumer/headers.rb |
rdkafka-0.6.0 | lib/rdkafka/consumer/headers.rb |
rdkafka-0.5.0 | lib/rdkafka/consumer/headers.rb |