lib/karafka/pro/scheduled_messages/contracts/message.rb in karafka-2.4.10 vs lib/karafka/pro/scheduled_messages/contracts/message.rb in karafka-2.4.11
- old
+ new
@@ -52,9 +52,25 @@
# in the past in general as often it is a source of errors
next if epoch_time >= Time.now.to_i - 10
[[[:headers], :schedule_target_epoch_in_the_past]]
end
+
+ # Makes sure, that the target envelope topic we dispatch to is a scheduled messages topic
+ virtual do |data, errors|
+ next unless errors.empty?
+
+ scheduled_topics = Karafka::App
+ .routes
+ .flat_map(&:topics)
+ .flat_map(&:to_a)
+ .select(&:scheduled_messages?)
+ .map(&:name)
+
+ next if scheduled_topics.include?(data[:topic].to_s)
+
+ [[[:topic], :not_a_scheduled_messages_topic]]
+ end
end
end
end
end
end