lib/karafka/pro/scheduled_messages/contracts/message.rb in karafka-2.4.10 vs lib/karafka/pro/scheduled_messages/contracts/message.rb in karafka-2.4.11

- old
+ new

@@ -52,9 +52,25 @@ # in the past in general as often it is a source of errors next if epoch_time >= Time.now.to_i - 10 [[[:headers], :schedule_target_epoch_in_the_past]] end + + # Makes sure, that the target envelope topic we dispatch to is a scheduled messages topic + virtual do |data, errors| + next unless errors.empty? + + scheduled_topics = Karafka::App + .routes + .flat_map(&:topics) + .flat_map(&:to_a) + .select(&:scheduled_messages?) + .map(&:name) + + next if scheduled_topics.include?(data[:topic].to_s) + + [[[:topic], :not_a_scheduled_messages_topic]] + end end end end end end