lib/karafka/pro/processing/piping/consumer.rb in karafka-2.4.0.rc1 vs lib/karafka/pro/processing/piping/consumer.rb in karafka-2.4.0

- old
+ new

@@ -34,13 +34,13 @@ # @param message [Karafka::Messages::Message] original message to pipe # # @note It will NOT deserialize the payload so it is fast # # @note We assume that there can be different number of partitions in the target topic, - # this is why we use `key` based on the original topic partition number and not the - # partition id itself. This will not utilize partitions beyond the number of partitions - # of original topic, but will accommodate for topics with less partitions. + # this is why we use `key` based on the original topic key and not the partition id. + # This will not utilize partitions beyond the number of partitions of original topic, + # but will accommodate for topics with less partitions. def pipe_async(topic:, message:) produce_async( build_pipe_message(topic: topic, message: message) ) end @@ -92,22 +92,28 @@ # @param message [Karafka::Messages::Message] original message to pipe # @return [Hash] hash with message to pipe. # # @note If you need to alter this, please define the `#enhance_pipe_message` method def build_pipe_message(topic:, message:) - original_partition = message.partition.to_s - pipe_message = { topic: topic, - key: original_partition, payload: message.raw_payload, - headers: message.headers.merge( + headers: message.raw_headers.merge( 'original_topic' => message.topic, - 'original_partition' => original_partition, + 'original_partition' => message.partition.to_s, 'original_offset' => message.offset.to_s, 'original_consumer_group' => self.topic.consumer_group.id ) } + + # Use a key only if key was provided + if message.raw_key + pipe_message[:key] = message.raw_key + # Otherwise pipe creating a key that will assign it based on the original partition + # number + else + pipe_message[:key] = message.partition.to_s + end # Optional method user can define in consumer to enhance the dlq message hash with # some extra details if needed or to replace payload, etc if respond_to?(:enhance_pipe_message, true) enhance_pipe_message(