Sha256: 5e3dca1638d4c7018e8f0d630623153c906cc9f0e4ffbf759bc3395e33925581

Contents?: true

Size: 1.23 KB

Versions: 7

Compression:

Stored size: 1.23 KB

Contents

# frozen_string_literal: true

module Karafka
  module Messages
    module Builders
      # Builder for creating message batch instances.
      module Messages
        class << self
          # Creates messages batch with messages inside based on the incoming messages and the
          # topic from which it comes.
          #
          # @param kafka_messages [Array<Rdkafka::Consumer::Message>] raw fetched messages
          # @param topic [Karafka::Routing::Topic] topic for which we're received messages
          # @param received_at [Time] moment in time when the messages were received
          # @return [Karafka::Messages::Messages] messages batch object
          def call(kafka_messages, topic, received_at)
            messages_array = kafka_messages.map do |message|
              Karafka::Messages::Builders::Message.call(
                message,
                topic,
                received_at
              )
            end

            metadata = BatchMetadata.call(
              kafka_messages,
              topic,
              received_at
            ).freeze

            Karafka::Messages::Messages.new(
              messages_array,
              metadata
            ).freeze
          end
        end
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
karafka-2.0.0.beta1 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha6 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha5 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha4 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha3 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha2 lib/karafka/messages/builders/messages.rb
karafka-2.0.0.alpha1 lib/karafka/messages/builders/messages.rb