.generator/src/generator/openapi.py in datadog_api_client-1.8.0 vs .generator/src/generator/openapi.py in datadog_api_client-1.9.0

- old
+ new

@@ -1,6 +1,10 @@ +import hashlib +import json import pathlib +import random +import uuid import yaml import warnings from jsonref import JsonRef from urllib.parse import urlparse from yaml import CSafeLoader @@ -26,27 +30,22 @@ """Return Ruby type name for the type.""" name = get_name(schema) if name: if "enum" in schema: return name - if ( - not schema.get("additionalProperties") - and schema.get("type", "object") == "object" - ): + if not schema.get("additionalProperties") and schema.get("type", "object") == "object": return name type_ = schema.get("type") if type_ is None: if "items" in schema: type_ = "array" elif "properties" in schema: type_ = "object" else: type_ = "object" - warnings.warn( - f"Unknown type for schema: {schema} ({name or alternative_name})" - ) + warnings.warn(f"Unknown type for schema: {schema} ({name or alternative_name})") if type_ == "integer": return "Integer" elif type_ == "number": return "Float" @@ -61,22 +60,15 @@ return "Boolean" elif type_ == "array": return "Array<{}>".format(type_to_ruby(schema["items"])) elif type_ == "object": if "additionalProperties" in schema: - return "Hash<String, {}>".format( - type_to_ruby(schema["additionalProperties"]) - ) + return "Hash<String, {}>".format(type_to_ruby(schema["additionalProperties"])) return ( alternative_name if alternative_name - and ( - "properties" in schema - or "oneOf" in schema - or "anyOf" in schema - or "allOf" in schema - ) + and ("properties" in schema or "oneOf" in schema or "anyOf" in schema or "allOf" in schema) else "Object" ) elif type_ == "null": return "nil" else: @@ -84,13 +76,11 @@ def get_type_for_attribute(schema, attribute, current_name=None): """Return Ruby type name for the attribute.""" child_schema = schema.get("properties", {}).get(attribute) - alternative_name = ( - current_name + formatter.camel_case(attribute) if current_name else None - ) + alternative_name = current_name + formatter.camel_case(attribute) if current_name else None return type_to_ruby(child_schema, alternative_name=alternative_name) def get_type_for_parameter(parameter): """Return Ruby type name for the parameter.""" @@ -121,13 +111,11 @@ yield from child_models(child, seen=seen) if "items" in schema: yield from child_models( schema["items"], - alternative_name=alternative_name + "Item" - if alternative_name is not None - else None, + alternative_name=alternative_name + "Item" if alternative_name is not None else None, seen=seen, ) if ( schema.get("type") == "object" or "properties" in schema or has_sub_models @@ -145,13 +133,11 @@ if "properties" in schema or has_sub_models: seen.add(name) yield name, schema for key, child in schema.get("properties", {}).items(): - yield from child_models( - child, alternative_name=name + formatter.camel_case(key), seen=seen - ) + yield from child_models(child, alternative_name=name + formatter.camel_case(key), seen=seen) if "enum" in schema: if name is None: raise ValueError(f"Schema {schema} has no name") @@ -278,11 +264,11 @@ matrix = { ("form", False): "csv", ("form", True): "multi", # TODO add more cases from https://swagger.io/specification/#parameter-style } - if (schema.get("type") == "array" or "items" in schema): + if schema.get("type") == "array" or "items" in schema: in_ = parameter.get("in", "query") style = parameter.get("style", in_to_style[in_]) explode = parameter.get("explode", True if style == "form" else False) return matrix.get((style, explode), "multi") @@ -294,13 +280,11 @@ url = url.replace("{" + variable + "}", value) # replace server variables if they were not replace before for variable in server["variables"]: if server_variables and variable in server_variables: continue - url = url.replace( - "{" + variable + "}", server["variables"][variable]["default"] - ) + url = url.replace("{" + variable + "}", server["variables"][variable]["default"]) return urlparse(url) def server_url_and_method(spec, operation_id, server_index=0, server_variables=None): for path in spec["paths"]: @@ -310,38 +294,194 @@ if "servers" in operation: server = operation["servers"][server_index] else: server = spec["servers"][server_index] return ( - format_server( - server, server_variables=server_variables, path=path - ).geturl(), + format_server(server, server_variables=server_variables, path=path).geturl(), method, ) raise ValueError(f"Operation {operation_id} not found") def response_code_and_accept_type(operation, status_code=None): for response in operation["responses"]: if status_code is None: - return int(response), next( - iter(operation["responses"][response].get("content", {None: None})) - ) + return int(response), next(iter(operation["responses"][response].get("content", {None: None}))) if response == str(status_code): - return status_code, next( - iter(operation["responses"][response].get("content", {None: None})) - ) + return status_code, next(iter(operation["responses"][response].get("content", {None: None}))) return status_code, None def request_content_type(operation, status_code=None): return next(iter(operation.get("requestBody", {}).get("content", {None: None}))) def response(operation, status_code=None): for response in operation["responses"]: if status_code is None or response == str(status_code): - return list(operation["responses"][response]["content"].values())[0][ - "schema" - ] + return list(operation["responses"][response]["content"].values())[0]["schema"] return None + + +def generate_value(schema, use_random=False, prefix=None): + spec = schema.spec + if not use_random: + if "example" in spec: + return spec["example"] + if "default" in spec: + return spec["default"] + + if spec["type"] == "string": + if use_random: + return str( + uuid.UUID( + bytes=hashlib.sha256( + str(prefix or schema.keys).encode("utf-8"), + ).digest()[:16] + ) + ) + return "string" + elif spec["type"] == "integer": + return random.randint(0, 32000) if use_random else len(str(prefix or schema.keys)) + elif spec["type"] == "number": + return random.random() if use_random else 1.0 / len(str(prefix or schema.keys)) + elif spec["type"] == "boolean": + return True + elif spec["type"] == "array": + return [generate_value(schema[0], use_random=use_random)] + elif spec["type"] == "object": + return {key: generate_value(schema[key], use_random=use_random) for key in spec["properties"]} + else: + raise TypeError(f"Unknown type: {spec['type']}") + + +class Schema: + def __init__(self, spec, value=None, keys=None): + self.spec = spec + self.value = value if value is not None else generate_value + self.keys = keys or tuple() + + def __getattr__(self, key): + return self[key] + + def __getitem__(self, key): + type_ = self.spec.get("type", "object") + if type_ == "object": + try: + return self.__class__( + self.spec["properties"][key], + value=self.value, + keys=self.keys + (key,), + ) + except KeyError: + if "oneOf" in self.spec: + for schema in self.spec["oneOf"]: + if schema.get("type", "object") == "object": + try: + return self.__class__( + schema["properties"][key], + value=self.value, + keys=self.keys + (key,), + ) + except KeyError: + pass + raise KeyError(f"{key} not found in {self.spec.get('properties', {}).keys()}: {self.spec}") + if type_ == "array": + return self.__class__(self.spec["items"], value=self.value, keys=self.keys + (key,)) + + raise KeyError(f"{key} not found in {self.spec}") + + def __repr__(self): + value = self.value(self) + if isinstance(value, (dict, list)): + return json.dumps(value, indent=2) + return str(value) + + +class Operation: + def __init__(self, name, spec, method, path): + self.name = name + self.spec = spec + self.method = method + self.path = path + + def server_url_and_method(self, spec, server_index=0, server_variables=None): + def format_server(server, path): + url = server["url"] + path + # replace potential path variables + for variable, value in server_variables.items(): + url = url.replace("{" + variable + "}", value) + # replace server variables if they were not replace before + for variable in server["variables"]: + if variable in server_variables: + continue + url = url.replace("{" + variable + "}", server["variables"][variable]["default"]) + return url + + server_variables = server_variables or {} + if "servers" in self.spec: + server = self.spec["servers"][server_index] + else: + server = spec["servers"][server_index] + return format_server(server, self.path), self.method + + def response_code_and_accept_type(self): + for response in self.spec["responses"]: + return int(response), next(iter(self.spec["responses"][response].get("content", {None: None}))) + return None, None + + def request_content_type(self): + return next(iter(self.spec.get("requestBody", {}).get("content", {None: None}))) + + def response(self): + for response in self.spec["responses"]: + return Schema(next(iter((self.spec["responses"][response]["content"].values())))["schema"]) + + def request(self): + return Schema(next(iter(self.spec["requestBody"]["content"].values()))["schema"]) + + +def get_default(operation, attribute_path): + attrs = attribute_path.split(".") + for name, parameter in parameters(operation): + if name == attrs[0]: + break + if name == attribute_path: + # We found a top level attribute matching the full path, let's use the default + return parameter["schema"]["default"] + + if name == "body": + parameter = next(iter(parameter["content"].values()))["schema"] + for attr in attrs[1:]: + parameter = parameter["properties"][attr] + return parameter["default"] + + +def get_container(operation, attribute_path, with_type=False): + + def get_type(parameter): + if with_type: + return f", {get_type_for_parameter(parameter)}" + return "" + + attribute_name = attribute_path.split(".")[0] + for name, parameter in parameters(operation): + if name == attribute_name: + if parameter["required"]: + return '{}, "{}"{}'.format(name, ".".join(formatter.attribute_name(a) for a in attribute_path.split(".")[1:]), get_type(parameter)) + return f'opts, "{formatter.attribute_path(attribute_path)}"{get_type(parameter)}' + + +def get_type_at_path(operation, attribute_path): + content = None + for code, response in operation.get("responses", {}).items(): + if int(code) >= 300: + continue + for content in response.get("content", {}).values(): + if "schema" in content: + break + if content is None: + raise RuntimeError("Default response not found") + for attr in attribute_path.split("."): + content = content["schema"]["properties"][attr] + return get_name(content.get("items"))