# frozen_string_literal: true module Karafka module Contracts # Contract with validation rules for Karafka configuration details. # # @note There are many more configuration options inside of the # `Karafka::Setup::Config` model, but we don't validate them here as they are # validated per each route (topic + consumer_group) because they can be overwritten, # so we validate all of that once all the routes are defined and ready. class Config < Base configure do |config| config.error_messages = YAML.safe_load( File.read( File.join(Karafka.gem_root, 'config', 'locales', 'errors.yml') ) ).fetch('en').fetch('validations').fetch('config') end # License validity happens in the licenser. Here we do only the simple consistency checks nested(:license) do required(:token) { |val| [true, false].include?(val) || val.is_a?(String) } required(:entity) { |val| val.is_a?(String) } end required(:client_id) { |val| val.is_a?(String) && Contracts::TOPIC_REGEXP.match?(val) } required(:concurrency) { |val| val.is_a?(Integer) && val.positive? } required(:consumer_mapper) { |val| !val.nil? } required(:consumer_persistence) { |val| [true, false].include?(val) } required(:pause_timeout) { |val| val.is_a?(Integer) && val.positive? } required(:pause_max_timeout) { |val| val.is_a?(Integer) && val.positive? } required(:pause_with_exponential_backoff) { |val| [true, false].include?(val) } required(:shutdown_timeout) { |val| val.is_a?(Integer) && val.positive? } required(:max_wait_time) { |val| val.is_a?(Integer) && val.positive? } required(:kafka) { |val| val.is_a?(Hash) && !val.empty? } nested(:admin) do # Can be empty because inherits values from the root kafka required(:kafka) { |val| val.is_a?(Hash) } required(:group_id) { |val| val.is_a?(String) && Contracts::TOPIC_REGEXP.match?(val) } required(:max_wait_time) { |val| val.is_a?(Integer) && val.positive? } required(:max_attempts) { |val| val.is_a?(Integer) && val.positive? } end # We validate internals just to be sure, that they are present and working nested(:internal) do required(:status) { |val| !val.nil? } required(:process) { |val| !val.nil? } nested(:connection) do nested(:proxy) do nested(:query_watermark_offsets) do required(:timeout) { |val| val.is_a?(Integer) && val.positive? } required(:max_attempts) { |val| val.is_a?(Integer) && val.positive? } required(:wait_time) { |val| val.is_a?(Integer) && val.positive? } end nested(:offsets_for_times) do required(:timeout) { |val| val.is_a?(Integer) && val.positive? } required(:max_attempts) { |val| val.is_a?(Integer) && val.positive? } required(:wait_time) { |val| val.is_a?(Integer) && val.positive? } end end end nested(:routing) do required(:builder) { |val| !val.nil? } required(:subscription_groups_builder) { |val| !val.nil? } end nested(:processing) do required(:jobs_builder) { |val| !val.nil? } required(:scheduler) { |val| !val.nil? } required(:coordinator_class) { |val| !val.nil? } required(:partitioner_class) { |val| !val.nil? } end nested(:active_job) do required(:dispatcher) { |val| !val.nil? } required(:job_options_contract) { |val| !val.nil? } required(:consumer_class) { |val| !val.nil? } end end # Ensure all root kafka keys are symbols virtual do |data, errors| next unless errors.empty? detected_errors = [] data.fetch(:kafka).each_key do |key| next if key.is_a?(Symbol) detected_errors << [[:kafka, key], :key_must_be_a_symbol] end detected_errors end # Ensure all admin kafka keys are symbols virtual do |data, errors| next unless errors.empty? detected_errors = [] data.fetch(:admin).fetch(:kafka).each_key do |key| next if key.is_a?(Symbol) detected_errors << [[:admin, :kafka, key], :key_must_be_a_symbol] end detected_errors end virtual do |data, errors| next unless errors.empty? pause_timeout = data.fetch(:pause_timeout) pause_max_timeout = data.fetch(:pause_max_timeout) next if pause_timeout <= pause_max_timeout [[%i[pause_timeout], :max_timeout_vs_pause_max_timeout]] end virtual do |data, errors| next unless errors.empty? shutdown_timeout = data.fetch(:shutdown_timeout) max_wait_time = data.fetch(:max_wait_time) next if max_wait_time < shutdown_timeout [[%i[shutdown_timeout], :shutdown_timeout_vs_max_wait_time]] end end end end