Sha256: dd0d67fe07d6b54e695c9d4d84455dd9057455c668390856725ea2e2c840227e

Contents?: true

Size: 1.96 KB

Versions: 11

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

module Karafka
  module Messages
    # Messages batch represents a set of messages received from Kafka of a single topic partition.
    class Messages
      include Enumerable

      attr_reader :metadata

      # @param messages_array [Array<Karafka::Messages::Message>] array with karafka messages
      # @param metadata [Karafka::Messages::BatchMetadata]
      # @return [Karafka::Messages::Messages] lazy evaluated messages batch object
      def initialize(messages_array, metadata)
        @messages_array = messages_array
        @metadata = metadata
      end

      # @param block [Proc] block we want to execute per each message
      # @note Invocation of this method will not cause loading and deserializing of messages.
      def each(&block)
        @messages_array.each(&block)
      end

      # Runs deserialization of all the messages and returns them
      # @return [Array<Karafka::Messages::Message>]
      def deserialize!
        each(&:payload)
      end

      # @return [Array<Object>] array with deserialized payloads. This method can be useful when
      #   we don't care about metadata and just want to extract all the data payloads from the
      #   batch
      def payloads
        map(&:payload)
      end

      # @return [Array<String>] array with raw, not deserialized payloads
      def raw_payloads
        map(&:raw_payload)
      end

      # @return [Boolean] is the messages batch empty
      def empty?
        @messages_array.empty?
      end

      # @return [Karafka::Messages::Message] first message
      def first
        @messages_array.first
      end

      # @return [Karafka::Messages::Message] last message
      def last
        @messages_array.last
      end

      # @return [Integer] number of messages in the batch
      def size
        @messages_array.size
      end

      # @return [Array<Karafka::Messages::Message>] pure array with messages
      def to_a
        @messages_array
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
karafka-2.1.7 lib/karafka/messages/messages.rb
karafka-2.1.6 lib/karafka/messages/messages.rb
karafka-2.1.5 lib/karafka/messages/messages.rb
karafka-2.1.5.beta1 lib/karafka/messages/messages.rb
karafka-2.1.4 lib/karafka/messages/messages.rb
karafka-2.1.3 lib/karafka/messages/messages.rb
karafka-2.1.2 lib/karafka/messages/messages.rb
karafka-2.1.1 lib/karafka/messages/messages.rb
karafka-2.1.0 lib/karafka/messages/messages.rb
karafka-2.0.41 lib/karafka/messages/messages.rb
karafka-2.0.40 lib/karafka/messages/messages.rb