Sha256: a1376be0fa08e0d825753ec119c7f862bc0ac4a8be11edfca019bddfea4d8a5f

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

module Rdkafka
  class Consumer
    # A message that was consumed from a topic.
    class Message
      # The topic this message was consumed from
      # @return [String]
      attr_reader :topic

      # The partition this message was consumed from
      # @return [Integer]
      attr_reader :partition

      # This message's payload
      # @return [String, nil]
      attr_reader :payload

      # This message's key
      # @return [String, nil]
      attr_reader :key

      # This message's offset in it's partition
      # @return [Integer]
      attr_reader :offset

      # This message's timestamp, if provided by the broker
      # @return [Integer, nil]
      attr_reader :timestamp

      # @private
      def initialize(native_message)
        unless native_message[:rkt].null?
          @topic = FFI.rd_kafka_topic_name(native_message[:rkt])
        end
        @partition = native_message[:partition]
        unless native_message[:payload].null?
          @payload = native_message[:payload].read_string(native_message[:len])
        end
        unless native_message[:key].null?
          @key = native_message[:key].read_string(native_message[:key_len])
        end
        @offset = native_message[:offset]
        @timestamp = FFI.rd_kafka_message_timestamp(native_message, nil)
      end

      # @return [String]
      def to_s
        "Message in '#{topic}' with key '#{key}', payload '#{payload}', partition #{partition}, offset #{offset}, timestamp #{timestamp}"
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rdkafka-0.2.0 lib/rdkafka/consumer/message.rb