lib/karafka/params/params_batch.rb in karafka-1.2.13 vs lib/karafka/params/params_batch.rb in karafka-1.3.0.rc1
- old
+ new
@@ -1,46 +1,61 @@
# frozen_string_literal: true
module Karafka
module Params
# Params batch represents a set of messages received from Kafka.
- # @note Params internally are lazy loaded before first use. That way we can skip parsing
- # process if we have after_fetch that rejects some incoming messages without using params
- # It can be also used when handling really heavy data (in terms of parsing).
+ # @note Params internally are lazy loaded before first use. That way we can skip
+ # deserialization process if we have after_fetch that rejects some incoming messages
+ # without using params It can be also used when handling really heavy data.
class ParamsBatch
include Enumerable
- # Builds up a params batch based on raw kafka messages
- # @param messages_batch [Array<Kafka::FetchedMessage>] messages batch
- # @param topic_parser [Class] topic parser for unparsing messages values
- def initialize(messages_batch, topic_parser)
- @params_batch = messages_batch.map! do |message|
- Karafka::Params::Params.build(message, topic_parser)
- end
+ # @param params_array [Array<Karafka::Params::Params>] array with karafka params
+ # @return [Karafka::Params::ParamsBatch] lazy evaluated params batch object
+ def initialize(params_array)
+ @params_array = params_array
end
- # @yieldparam [Karafka::Params::Params] each parsed and loaded params instance
- # @note Invocation of this method will cause loading and parsing each param after another.
- # If you want to get access without parsing, please access params_batch directly
+ # @yieldparam [Karafka::Params::Params] each deserialized and loaded params instance
+ # @note Invocation of this method will cause loading and deserializing each param after
+ # another. If you want to get access without deserializing, please access params_array
+ # directly
def each
- @params_batch.each { |param| yield(param.retrieve!) }
+ @params_array.each { |param| yield(param.deserialize!) }
end
# @return [Array<Karafka::Params::Params>] returns all the params in a loaded state, so they
# can be used for batch insert, etc. Without invoking all, up until first use, they won't
- # be parsed
- def parsed
+ # be deserialized
+ def deserialize!
each(&:itself)
end
- # @return [Karafka::Params::Params] last element after the unparsing process
+ # @return [Array<Object>] array with deserialized payloads. This method can be useful when
+ # we don't care about metadata and just want to extract all the data payloads from the
+ # batch
+ def payloads
+ deserialize!.map(&:payload)
+ end
+
+ # @return [Karafka::Params::Params] first element after the deserialization process
+ def first
+ @params_array.first.deserialize!
+ end
+
+ # @return [Karafka::Params::Params] last element after the deserialization process
def last
- @params_batch.last.retrieve!
+ @params_array.last.deserialize!
end
- # @return [Array<Karafka::Params::Params>] pure array with params (not parsed)
+ # @return [Array<Karafka::Params::Params>] pure array with params (not deserialized)
def to_a
- @params_batch
+ @params_array
+ end
+
+ # @return [Integer] number of messages in the batch
+ def size
+ @params_array.size
end
end
end
end