# typed: strict module DataModel # Hash type has a concept of "child types" class Builtin::Hash < Type include Errors include Logging class Arguments < T::Struct prop :optional, T::Boolean, default: false prop :open, T::Boolean, default: true end ## Children sig { override.params(params: T::Array[Object]).void } def configure(params) result = T.let({}, T::Hash[Symbol, Type]) @children = T.let(result, T.nilable(T::Hash[Symbol, Type])) log.debug("configuring hash children") for child in T.cast(params, T::Array[T::Array[Object]]) name, *schema = child if !name.is_a?(Symbol) raise "expected name as a symbol for the first element of child schemas, got #{name.inspect} for #{child.inspect}" end if schema.nil? || schema.empty? raise "schema for #{name} is missing" end node = Scanner.scan(schema) log.debug("adding hash child -> #{name}: #{node.serialize}") result[name] = instantiate(node.type, args: node.args, params: node.params) end end sig { returns(T::Hash[Symbol, Type]) } def children if @children.nil? raise "children not configured" end return @children end ## Read sig { override.params(val: Object, coerce: T::Boolean).returns(TTypeResult) } def read(val, coerce: false) args = Arguments.new(type_args) errors = Error.new # early positive exit for optional & missing if args.optional && val.nil? return [val, errors] end if !args.optional && val.nil? errors.add(missing_error(Hash)) return [val, errors] end # type error, early exit if !val.is_a?(Hash) && !coerce errors.add(type_error(Hash, val)) return [val, errors] end # attempt coercion if !val.is_a?(Hash) && coerce if val.respond_to?(:to_h) val = T.unsafe(val).to_h elsif val.respond_to?(:to_hash) val = Hash(val) else errors.add(coerce_error(Hash, val)) return [val, errors] end end hash = T.cast(val, T::Hash[Symbol, Object]) # detect extra keys then what is defined in the schema if !args.open keys = children.keys extra = hash.keys - keys if !extra.empty? errors.add(extra_keys_error(extra)) return [val, errors] end end # process children log.debug("processing hash children") for (name, child) in children hash[name], child_errors = child.read(hash[name], coerce:) log.debug("child #{name} -> #{hash[name].inspect} #{child_errors.inspect}") if !child_errors.any? log.debug("no errors, skipping") next end errors.merge_child(name, child_errors) return [val, errors] end log.debug("hash check successful") # done return [val, errors] end end end