lib/karafka/pro/processing/piping/consumer.rb in karafka-2.4.0.rc1 vs lib/karafka/pro/processing/piping/consumer.rb in karafka-2.4.0
- old
+ new
@@ -34,13 +34,13 @@
# @param message [Karafka::Messages::Message] original message to pipe
#
# @note It will NOT deserialize the payload so it is fast
#
# @note We assume that there can be different number of partitions in the target topic,
- # this is why we use `key` based on the original topic partition number and not the
- # partition id itself. This will not utilize partitions beyond the number of partitions
- # of original topic, but will accommodate for topics with less partitions.
+ # this is why we use `key` based on the original topic key and not the partition id.
+ # This will not utilize partitions beyond the number of partitions of original topic,
+ # but will accommodate for topics with less partitions.
def pipe_async(topic:, message:)
produce_async(
build_pipe_message(topic: topic, message: message)
)
end
@@ -92,22 +92,28 @@
# @param message [Karafka::Messages::Message] original message to pipe
# @return [Hash] hash with message to pipe.
#
# @note If you need to alter this, please define the `#enhance_pipe_message` method
def build_pipe_message(topic:, message:)
- original_partition = message.partition.to_s
-
pipe_message = {
topic: topic,
- key: original_partition,
payload: message.raw_payload,
- headers: message.headers.merge(
+ headers: message.raw_headers.merge(
'original_topic' => message.topic,
- 'original_partition' => original_partition,
+ 'original_partition' => message.partition.to_s,
'original_offset' => message.offset.to_s,
'original_consumer_group' => self.topic.consumer_group.id
)
}
+
+ # Use a key only if key was provided
+ if message.raw_key
+ pipe_message[:key] = message.raw_key
+ # Otherwise pipe creating a key that will assign it based on the original partition
+ # number
+ else
+ pipe_message[:key] = message.partition.to_s
+ end
# Optional method user can define in consumer to enhance the dlq message hash with
# some extra details if needed or to replace payload, etc
if respond_to?(:enhance_pipe_message, true)
enhance_pipe_message(