Sha256: fac15272fdfd3886374cab8f4c73de7a755a0f3f8ad2264a41f063bb323e2665

Contents?: true

Size: 1.67 KB

Versions: 21

Compression:

Stored size: 1.67 KB

Contents

# frozen_string_literal: true

module Karafka
  module Messages
    module Builders
      # Builder for creating batch metadata object based on the batch informations.
      module BatchMetadata
        class << self
          # Creates metadata based on the kafka batch data.
          #
          # @param messages [Array<Karafka::Messages::Message>] messages array
          # @param topic [Karafka::Routing::Topic] topic for which we've fetched the batch
          # @param partition [Integer] partition of this metadata
          # @param scheduled_at [Time] moment when the batch was scheduled for processing
          # @return [Karafka::Messages::BatchMetadata] batch metadata object
          #
          # @note We do not set `processed_at` as this needs to be assigned when the batch is
          #   picked up for processing.
          def call(messages, topic, partition, scheduled_at)
            Karafka::Messages::BatchMetadata.new(
              size: messages.count,
              first_offset: messages.first&.offset || -1001,
              last_offset: messages.last&.offset || -1001,
              deserializer: topic.deserializer,
              partition: partition,
              topic: topic.name,
              # We go with the assumption that the creation of the whole batch is the last message
              # creation time
              created_at: messages.last&.timestamp || nil,
              # When this batch was built and scheduled for execution
              scheduled_at: scheduled_at,
              # This needs to be set to a correct value prior to processing starting
              processed_at: nil
            )
          end
        end
      end
    end
  end
end

Version data entries

21 entries across 21 versions & 1 rubygems

Version Path
karafka-2.2.3 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.2.2 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.2.1 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.2.0 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.13 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.12 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.11 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.10 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.9 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.8 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.7 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.6 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.5 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.5.beta1 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.4 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.3 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.2 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.1 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.1.0 lib/karafka/messages/builders/batch_metadata.rb
karafka-2.0.41 lib/karafka/messages/builders/batch_metadata.rb