diff --git a/openapi_python_client/__init__.py b/openapi_python_client/__init__.py index 8c97d45e9..61353eade 100644 --- a/openapi_python_client/__init__.py +++ b/openapi_python_client/__init__.py @@ -3,19 +3,20 @@ import shutil import subprocess import sys +import urllib from enum import Enum from pathlib import Path -from typing import Any, Dict, Optional, Sequence, Union +from typing import Any, Dict, Optional, Sequence, Union, cast import httpcore import httpx -import yaml from jinja2 import BaseLoader, ChoiceLoader, Environment, FileSystemLoader, PackageLoader from openapi_python_client import utils from .parser import GeneratorData, import_string_from_reference from .parser.errors import GeneratorError +from .resolver.schema_resolver import SchemaResolver from .utils import snake_case if sys.version_info.minor < 8: # version did not exist before 3.8, need to use a backport @@ -287,20 +288,21 @@ def update_existing_client( def _get_document(*, url: Optional[str], path: Optional[Path]) -> Union[Dict[str, Any], GeneratorError]: - yaml_bytes: bytes if url is not None and path is not None: return GeneratorError(header="Provide URL or Path, not both.") - if url is not None: - try: - response = httpx.get(url) - yaml_bytes = response.content - except (httpx.HTTPError, httpcore.NetworkError): - return GeneratorError(header="Could not get OpenAPI document from provided URL") - elif path is not None: - yaml_bytes = path.read_bytes() - else: + + if url is None and path is None: return GeneratorError(header="No URL or Path provided") + + source = cast(Union[str, Path], (url if url is not None else path)) try: - return yaml.safe_load(yaml_bytes) - except yaml.YAMLError: + resolver = SchemaResolver(source) + result = resolver.resolve() + if len(result.errors) > 0: + return GeneratorError(header="; ".join(result.errors)) + except (httpx.HTTPError, httpcore.NetworkError, urllib.error.URLError): + return GeneratorError(header="Could not get OpenAPI document from provided URL") + except Exception: return GeneratorError(header="Invalid YAML from provided source") + + return result.schema diff --git a/openapi_python_client/resolver/__init__.py b/openapi_python_client/resolver/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openapi_python_client/resolver/data_loader.py b/openapi_python_client/resolver/data_loader.py new file mode 100644 index 000000000..aaa0fd26b --- /dev/null +++ b/openapi_python_client/resolver/data_loader.py @@ -0,0 +1,22 @@ +import yaml + +from .resolver_types import SchemaData + + +class DataLoader: + @classmethod + def load(cls, path: str, data: bytes) -> SchemaData: + data_type = path.split(".")[-1].casefold() + + if data_type == "json": + return cls.load_json(data) + else: + return cls.load_yaml(data) + + @classmethod + def load_json(cls, data: bytes) -> SchemaData: + raise NotImplementedError() + + @classmethod + def load_yaml(cls, data: bytes) -> SchemaData: + return yaml.safe_load(data) diff --git a/openapi_python_client/resolver/pointer.py b/openapi_python_client/resolver/pointer.py new file mode 100644 index 000000000..36874e294 --- /dev/null +++ b/openapi_python_client/resolver/pointer.py @@ -0,0 +1,48 @@ +import urllib.parse +from typing import List, Union + + +class Pointer: + """ https://tools.ietf.org/html/rfc6901 """ + + def __init__(self, pointer: str) -> None: + if pointer is None or pointer != "" and not pointer.startswith("/"): + raise ValueError(f'Invalid pointer value {pointer}, it must match: *( "/" reference-token )') + + self._pointer = pointer + + @property + def value(self) -> str: + return self._pointer + + @property + def parent(self) -> Union["Pointer", None]: + tokens = self.tokens(False) + + if len(tokens) > 1: + tokens.pop() + return Pointer("/".join(tokens)) + else: + assert tokens[-1] == "" + return None + + def tokens(self, unescape: bool = True) -> List[str]: + tokens = [] + + if unescape: + for token in self._pointer.split("/"): + tokens.append(self._unescape(token)) + else: + tokens = self._pointer.split("/") + + return tokens + + @property + def unescapated_value(self) -> str: + return self._unescape(self._pointer) + + def _unescape(self, data: str) -> str: + data = urllib.parse.unquote(data) + data = data.replace("~1", "/") + data = data.replace("~0", "~") + return data diff --git a/openapi_python_client/resolver/reference.py b/openapi_python_client/resolver/reference.py new file mode 100644 index 000000000..534232bcd --- /dev/null +++ b/openapi_python_client/resolver/reference.py @@ -0,0 +1,51 @@ +import urllib.parse + +from .pointer import Pointer + + +class Reference: + """ https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03 """ + + def __init__(self, reference: str): + self._ref = reference + self._parsed_ref = urllib.parse.urlparse(reference) + + @property + def path(self) -> str: + return urllib.parse.urldefrag(self._parsed_ref.geturl()).url + + @property + def pointer(self) -> Pointer: + frag = self._parsed_ref.fragment + if self.is_url() and frag != "" and not frag.startswith("/"): + frag = f"/{frag}" + + return Pointer(frag) + + def is_relative(self) -> bool: + """ return True if reference path is a relative path """ + return not self.is_absolute() + + def is_absolute(self) -> bool: + """ return True is reference path is an absolute path """ + return self._parsed_ref.netloc != "" + + @property + def value(self) -> str: + return self._ref + + def is_url(self) -> bool: + """ return True if the reference path is pointing to an external url location """ + return self.is_remote() and self._parsed_ref.netloc != "" + + def is_remote(self) -> bool: + """ return True if the reference pointer is pointing to a remote document """ + return not self.is_local() + + def is_local(self) -> bool: + """ return True if the reference pointer is pointing to the current document """ + return self._parsed_ref.path == "" + + def is_full_document(self) -> bool: + """ return True if the reference pointer is pointing to the whole document content """ + return self.pointer.parent is None diff --git a/openapi_python_client/resolver/resolved_schema.py b/openapi_python_client/resolver/resolved_schema.py new file mode 100644 index 000000000..7e528e694 --- /dev/null +++ b/openapi_python_client/resolver/resolved_schema.py @@ -0,0 +1,202 @@ +import hashlib +from typing import Any, Dict, Generator, List, Tuple, Union, cast + +from .reference import Reference +from .resolver_types import SchemaData + + +class ResolvedSchema: + def __init__(self, root: SchemaData, refs: Dict[str, SchemaData], errors: List[str]): + self._root: SchemaData = root + self._refs: Dict[str, SchemaData] = refs + self._errors: List[str] = errors + self._resolved_remotes_components: SchemaData = cast(SchemaData, {}) + + self._resolved_schema: SchemaData = cast(SchemaData, {}) + if len(self._errors) == 0: + self._process() + + @property + def schema(self) -> SchemaData: + return self._root + + @property + def errors(self) -> List[str]: + return self._errors.copy() + + def _process(self) -> None: + self._process_remote_paths() + self._process_remote_components(self._root) + self._root.update(self._resolved_remotes_components) + + def _process_remote_paths(self) -> None: + refs_to_replace = [] + for owner, ref_key, ref_val in self._lookup_schema_references_in(self._root, "paths"): + ref = Reference(ref_val) + + if ref.is_local(): + continue + + remote_path = ref.pointer.value + path = ref.path + + if remote_path not in self._refs: + self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) + else: + remote_schema = self._refs[remote_path] + remote_value = self._lookup_dict(remote_schema, path) + if not remote_value: + self._errors.append("Failed to read remote value {}, in remote ref {}".format(path, remote_path)) + else: + refs_to_replace.append((owner, remote_schema, remote_value)) + + for owner, remote_schema, remote_value in refs_to_replace: + self._process_remote_components(remote_schema, remote_value, 1) + self._replace_reference_with(owner, remote_value) + + def _process_remote_components( + self, owner: SchemaData, subpart: Union[SchemaData, None] = None, depth: int = 0 + ) -> None: + target = subpart if subpart else owner + + for parent, ref_key, ref_val in self._lookup_schema_references(target): + ref = Reference(ref_val) + + if ref.is_local(): + # print('Found local reference >> {0}'.format(ref.value)) + if depth > 0: + self._transform_to_local_components(owner, ref) + else: + remote_path = ref.pointer.value + if remote_path not in self._refs: + self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) + else: + remote_owner = self._refs[remote_path] + self._transform_to_local_components(remote_owner, ref) + self._transform_to_local_ref(parent, ref) + + def _transform_to_local_components(self, owner: SchemaData, ref: Reference) -> None: + self._ensure_components_dir_exists(ref) + + # print('Processing remote component > {0}'.format(ref.value)) + remote_component = self._lookup_dict(owner, ref.path) + pointer_parent = ref.pointer.parent + + if pointer_parent is not None: + root_components_dir = self._lookup_dict(self._resolved_remotes_components, pointer_parent.value) + component_name = ref.path.split("/")[-1] + + if component_name == "SorTransparentContainer" or component_name == "sorTransparentContainer": + print(ref.value) + + if remote_component is None: + print("Weirdy relookup of >> {0}".format(ref.value)) + assert ref.is_local() and self._lookup_dict(self._resolved_remotes_components, ref.path) + return + + if "$ref" in remote_component: + subref = Reference(remote_component["$ref"]) + if not subref.is_local(): + print("Lookup remote ref >>> {0}".format(subref.value)) + return self._process_remote_components(remote_component) + + if root_components_dir: + if component_name in root_components_dir: + local_component_hash = self._reference_schema_hash(root_components_dir[component_name]) + remote_component_hash = self._reference_schema_hash(remote_component) + + if local_component_hash == remote_component_hash: + return + else: + pass + # print('=' * 120) + # print('TODO: Find compoment collision to handle on >>> {0}'.format(ref.path)) + # print('Local componente {0} >> {1}'.format(local_component_hash, root_components_dir[component_name])) + # print('') + # print('Remote componente {0} >> {1}'.format(remote_component_hash, remote_component)) + # print('=' * 120) + else: + root_components_dir[component_name] = remote_component + self._process_remote_components(owner, remote_component, 2) + + def _ensure_components_dir_exists(self, ref: Reference) -> None: + cursor = self._resolved_remotes_components + pointer_dir = ref.pointer.parent + assert pointer_dir is not None + + for key in pointer_dir.value.split("/"): # noqa + if key == "": + continue + + if key not in cursor: + cursor[key] = {} + + cursor = cursor[key] + + def _transform_to_local_ref(self, owner: Dict[str, Any], ref: Reference) -> None: + owner["$ref"] = "#{0}".format(ref.path) + + def _lookup_dict(self, attr: SchemaData, query: str) -> Union[SchemaData, None]: + cursor = attr + query_parts = [] + + if query.startswith("/paths"): + query_parts = ["paths", query.replace("/paths//", "/").replace("/paths", "")] + else: + query_parts = query.split("/") + + for key in query_parts: + if key == "": + continue + + if isinstance(cursor, dict) and key in cursor: + cursor = cursor[key] + else: + return None + return cursor + + def _replace_reference_with(self, root: Dict[str, Any], new_value: Dict[str, Any]) -> None: + for key in new_value: + root[key] = new_value[key] + + root.pop("$ref") + + def _lookup_schema_references_in( + self, attr: SchemaData, path: str + ) -> Generator[Tuple[SchemaData, str, Any], None, None]: + if not isinstance(attr, dict) or path not in attr: + return + + yield from self._lookup_schema_references(attr[path]) + + def _lookup_schema_references(self, attr: Any) -> Generator[Tuple[SchemaData, str, str], None, None]: + if isinstance(attr, dict): + for key, val in attr.items(): + if key == "$ref": + yield cast(SchemaData, attr), cast(str, key), cast(str, val) + else: + yield from self._lookup_schema_references(val) + + elif isinstance(attr, list): + for val in attr: + yield from self._lookup_schema_references(val) + + def _reference_schema_hash(self, schema: Dict[str, Any]) -> str: + md5 = hashlib.md5() + hash_elms = [] + for key in schema.keys(): + if key == "description": + continue + + if key == "type": + hash_elms.append(schema[key]) + + if key == "allOf": + for item in schema[key]: + hash_elms.append(str(item)) + + hash_elms.append(key) + + hash_elms.sort() + md5.update(";".join(hash_elms).encode("utf-8")) + return md5.hexdigest() diff --git a/openapi_python_client/resolver/resolver_types.py b/openapi_python_client/resolver/resolver_types.py new file mode 100644 index 000000000..84f6cea5b --- /dev/null +++ b/openapi_python_client/resolver/resolver_types.py @@ -0,0 +1,3 @@ +from typing import Any, Dict, NewType + +SchemaData = NewType("SchemaData", Dict[str, Any]) diff --git a/openapi_python_client/resolver/schema_resolver.py b/openapi_python_client/resolver/schema_resolver.py new file mode 100644 index 000000000..5a1c602e7 --- /dev/null +++ b/openapi_python_client/resolver/schema_resolver.py @@ -0,0 +1,145 @@ +import logging +import urllib +from pathlib import Path +from typing import Any, Dict, Generator, List, Union, cast + +import httpx + +from .data_loader import DataLoader +from .reference import Reference +from .resolved_schema import ResolvedSchema +from .resolver_types import SchemaData + + +class SchemaResolver: + def __init__(self, url_or_path: Union[str, Path]): + if not url_or_path: + raise ValueError("Invalid document root reference, it shall be an remote url or local file path") + + self._root_path: Union[Path, None] = None + self._root_url: Union[str, None] = None + self._root_url_scheme: Union[str, None] = None + self._parent_path: str + + if self._isapath(url_or_path): + url_or_path = cast(Path, url_or_path) + self._root_path = url_or_path.absolute() + self._parent_path = str(self._root_path.parent) + else: + url_or_path = cast(str, url_or_path) + self._root_url = url_or_path + self._parent_path = url_or_path + try: + self._root_url_scheme = urllib.parse.urlparse(url_or_path).scheme + if self._root_url_scheme not in ["http", "https"]: + raise ValueError(f"Unsupported URL scheme '{self._root_url_scheme}', expecting http or https") + except (TypeError, AttributeError): + raise urllib.error.URLError(f"Coult not parse URL > {url_or_path}") + + def _isapath(self, url_or_path: Union[str, Path]) -> bool: + return isinstance(url_or_path, Path) + + def resolve(self, recursive: bool = True) -> ResolvedSchema: + assert self._root_path or self._root_url + + root_schema: SchemaData + external_schemas: Dict[str, SchemaData] = {} + errors: List[str] = [] + parent: str + + if self._root_path: + root_schema = self._fetch_remote_file_path(self._root_path) + elif self._root_url: + root_schema = self._fetch_url_reference(self._root_url) + + self._resolve_schema_references(self._parent_path, root_schema, external_schemas, errors, recursive) + return ResolvedSchema(root_schema, external_schemas, errors) + + def _resolve_schema_references( + self, + parent: str, + root: SchemaData, + external_schemas: Dict[str, SchemaData], + errors: List[str], + recursive: bool, + ) -> None: + + for ref in self._lookup_schema_references(root): + if ref.is_local(): + continue + + try: + path = self._absolute_path(ref.path, parent) + parent = self._parent(path) + + if path in external_schemas: + continue + + external_schemas[path] = self._fetch_remote_reference(path) + + if recursive: + self._resolve_schema_references(parent, external_schemas[path], external_schemas, errors, recursive) + + except Exception: + errors.append(f"Failed to gather external reference data of {ref.value} from {path}") + logging.exception(f"Failed to gather external reference data of {ref.value} from {path}") + + def _parent(self, abs_path: str) -> str: + if abs_path.startswith("http", 0): + return urllib.parse.urljoin(f"{abs_path}/", "..") + else: + path = Path(abs_path) + return str(path.parent) + + def _absolute_path(self, relative_path: str, parent: str) -> str: + if relative_path.startswith("http", 0): + return relative_path + + if relative_path.startswith("//"): + if parent.startswith("http"): + scheme = urllib.parse.urlparse(parent).scheme + return f"{scheme}:{relative_path}" + else: + scheme = self._root_url_scheme or "http" + return f"{scheme}:{relative_path}" + + if parent.startswith("http"): + return urllib.parse.urljoin(parent, relative_path) + else: + parent_dir = Path(parent) + abs_path = parent_dir.joinpath(relative_path) + abs_path = abs_path.resolve() + return str(abs_path) + + def _fetch_remote_reference(self, abs_path: str) -> SchemaData: + res: SchemaData + + if abs_path.startswith("http"): + res = self._fetch_url_reference(abs_path) + else: + res = self._fetch_remote_file_path(Path(abs_path)) + + return res + + def _fetch_remote_file_path(self, path: Path) -> SchemaData: + logging.info(f"Fetching remote ref file path > {path}") + return DataLoader.load(str(path), path.read_bytes()) + + def _fetch_url_reference(self, url: str) -> SchemaData: + if url.startswith("//", 0): + url = "{0}:{1}".format((self._root_url_scheme or "http"), url) + + logging.info(f"Fetching remote ref url > {url}") + return DataLoader.load(url, httpx.get(url).content) + + def _lookup_schema_references(self, attr: Any) -> Generator[Reference, None, None]: + if isinstance(attr, dict): + for key, val in attr.items(): + if key == "$ref": + yield Reference(val) + else: + yield from self._lookup_schema_references(val) + + elif isinstance(attr, list): + for val in attr: + yield from self._lookup_schema_references(val) diff --git a/tests/test___init__.py b/tests/test___init__.py index d45108181..dfcf699ba 100644 --- a/tests/test___init__.py +++ b/tests/test___init__.py @@ -1,4 +1,5 @@ import pathlib +from urllib.parse import ParseResult import httpcore import jinja2 @@ -167,7 +168,7 @@ def test__get_document_url_and_path(self, mocker): loads.assert_not_called() def test__get_document_bad_url(self, mocker): - get = mocker.patch("httpx.get", side_effect=httpcore.NetworkError) + get = mocker.patch("httpx.get") Path = mocker.patch("openapi_python_client.Path") loads = mocker.patch("yaml.safe_load") @@ -177,7 +178,7 @@ def test__get_document_bad_url(self, mocker): result = _get_document(url=url, path=None) assert result == GeneratorError(header="Could not get OpenAPI document from provided URL") - get.assert_called_once_with(url) + get.assert_not_called() Path.assert_not_called() loads.assert_not_called() @@ -188,7 +189,7 @@ def test__get_document_url_no_path(self, mocker): from openapi_python_client import _get_document - url = mocker.MagicMock() + url = "http://localhost/" _get_document(url=url, path=None) get.assert_called_once_with(url) @@ -198,6 +199,7 @@ def test__get_document_url_no_path(self, mocker): def test__get_document_path_no_url(self, mocker): get = mocker.patch("httpx.get") loads = mocker.patch("yaml.safe_load") + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) from openapi_python_client import _get_document @@ -205,12 +207,13 @@ def test__get_document_path_no_url(self, mocker): _get_document(url=None, path=path) get.assert_not_called() - path.read_bytes.assert_called_once() - loads.assert_called_once_with(path.read_bytes()) + path.absolute().read_bytes.assert_called_once() + loads.assert_called_once_with(path.absolute().read_bytes()) def test__get_document_bad_yaml(self, mocker): get = mocker.patch("httpx.get") loads = mocker.patch("yaml.safe_load", side_effect=yaml.YAMLError) + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) from openapi_python_client import _get_document @@ -218,8 +221,8 @@ def test__get_document_bad_yaml(self, mocker): result = _get_document(url=None, path=path) get.assert_not_called() - path.read_bytes.assert_called_once() - loads.assert_called_once_with(path.read_bytes()) + path.absolute().read_bytes.assert_called_once() + loads.assert_called_once_with(path.absolute().read_bytes()) assert result == GeneratorError(header="Invalid YAML from provided source") diff --git a/tests/test_resolver/test_resolver_data_loader.py b/tests/test_resolver/test_resolver_data_loader.py new file mode 100644 index 000000000..ed20dd95f --- /dev/null +++ b/tests/test_resolver/test_resolver_data_loader.py @@ -0,0 +1,50 @@ +import pytest + + +def test_load(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + dl_load_json = mocker.patch("openapi_python_client.resolver.data_loader.DataLoader.load_json") + dl_load_yaml = mocker.patch("openapi_python_client.resolver.data_loader.DataLoader.load_yaml") + + content = mocker.MagicMock() + DataLoader.load("foobar.json", content) + dl_load_json.assert_called_once_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.jSoN", content) + dl_load_json.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.yaml", content) + dl_load_yaml.assert_called_once_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.yAmL", content) + dl_load_yaml.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar.ymL", content) + dl_load_yaml.assert_called_with(content) + + content = mocker.MagicMock() + DataLoader.load("foobar", content) + dl_load_yaml.assert_called_with(content) + + +def test_load_yaml(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + yaml_safeload = mocker.patch("yaml.safe_load") + + content = mocker.MagicMock() + DataLoader.load_yaml(content) + yaml_safeload.assert_called_once_with(content) + + +def test_load_json(mocker): + from openapi_python_client.resolver.data_loader import DataLoader + + content = mocker.MagicMock() + with pytest.raises(NotImplementedError): + DataLoader.load_json(content) diff --git a/tests/test_resolver/test_resolver_pointer.py b/tests/test_resolver/test_resolver_pointer.py new file mode 100644 index 000000000..92e1ded35 --- /dev/null +++ b/tests/test_resolver/test_resolver_pointer.py @@ -0,0 +1,97 @@ +import pytest + + +def get_data_set(): + # https://tools.ietf.org/html/rfc6901 + return { + "valid_pointers": [ + "/myElement", + "/definitions/myElement", + "", + "/foo", + "/foo/0", + "/", + "/a~1b", + "/c%d", + "/e^f", + "/g|h", + "/i\\j" '/k"l', + "/ ", + "/m~0n", + "/m~01", + ], + "invalid_pointers": ["../foo", "foobar", None], + "tokens_by_pointer": { + "/myElement": ["", "myElement"], + "/definitions/myElement": ["", "definitions", "myElement"], + "": [""], + "/foo": ["", "foo"], + "/foo/0": ["", "foo", "0"], + "/": ["", ""], + "/a~1b": ["", "a/b"], + "/c%d": ["", "c%d"], + "/e^f": ["", "e^f"], + "/g|h": ["", "g|h"], + "/i\\j": ["", "i\\j"], + '/k"l': ["", 'k"l'], + "/ ": ["", " "], + "/m~0n": ["", "m~n"], + "/m~01": ["", "m~1"], + }, + } + + +def test___init__(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["valid_pointers"]: + p = Pointer(pointer_str) + assert p.value != None + assert p.value == pointer_str + + for pointer_str in data_set["invalid_pointers"]: + with pytest.raises(ValueError): + p = Pointer(pointer_str) + + +def test_token(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["tokens_by_pointer"].keys(): + p = Pointer(pointer_str) + expected_tokens = data_set["tokens_by_pointer"][pointer_str] + + for idx, token in enumerate(p.tokens()): + assert expected_tokens[idx] == token + + +def test_parent(): + from openapi_python_client.resolver.pointer import Pointer + + data_set = get_data_set() + + for pointer_str in data_set["tokens_by_pointer"].keys(): + p = Pointer(pointer_str) + expected_tokens = data_set["tokens_by_pointer"][pointer_str] + + while p.parent is not None: + p = p.parent + expected_tokens.pop() + assert p.tokens()[-1] == expected_tokens[-1] + assert len(p.tokens()) == len(expected_tokens) + + assert len(expected_tokens) == 1 + assert expected_tokens[-1] == "" + + +def test__unescape_and__escape(): + from openapi_python_client.resolver.pointer import Pointer + + escaped_unescaped_values = [("/m~0n", "/m~n"), ("/m~01", "/m~1"), ("/a~1b", "/a/b"), ("/foobar", "/foobar")] + + for escaped, unescaped in escaped_unescaped_values: + assert Pointer(escaped).unescapated_value == unescaped diff --git a/tests/test_resolver/test_resolver_reference.py b/tests/test_resolver/test_resolver_reference.py new file mode 100644 index 000000000..6782426f3 --- /dev/null +++ b/tests/test_resolver/test_resolver_reference.py @@ -0,0 +1,212 @@ +import pytest + + +def get_data_set(): + # https://swagger.io/docs/specification/using-ref/ + return { + "local_references": ["#/definitions/myElement"], + "remote_references": [ + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + ], + "url_references": [ + "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement", + "//anotherserver.com/files/example.json", + ], + "relative_references": [ + "#/definitions/myElement", + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + ], + "absolute_references": [ + "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement", + "//anotherserver.com/files/example.json", + ], + "full_document_references": [ + "http://path/to/your/resource", + "//anotherserver.com/files/example.json", + ], + "not_full_document_references": [ + "#/definitions/myElement", + "document.json#/myElement", + "../document.json#/myElement", + "../another-folder/document.json#/myElement", + "http://path/to/your/resource.json#myElement", + ], + "path_by_reference": { + "#/definitions/myElement": "", + "document.json#/myElement": "document.json", + "../document.json#/myElement": "../document.json", + "../another-folder/document.json#/myElement": "../another-folder/document.json", + "http://path/to/your/resource": "http://path/to/your/resource", + "http://path/to/your/resource.json#myElement": "http://path/to/your/resource.json", + "//anotherserver.com/files/example.json": "//anotherserver.com/files/example.json", + }, + "pointer_by_reference": { + "#/definitions/myElement": "/definitions/myElement", + "document.json#/myElement": "/myElement", + "../document.json#/myElement": "/myElement", + "../another-folder/document.json#/myElement": "/myElement", + "http://path/to/your/resource": "", + "http://path/to/your/resource.json#myElement": "/myElement", + "//anotherserver.com/files/example.json": "", + }, + "pointerparent_by_reference": { + "#/definitions/myElement": "/definitions", + "document.json#/myElement": "", + "../document.json#/myElement": "", + "../another-folder/document.json#/myElement": "", + "http://path/to/your/resource": None, + "http://path/to/your/resource.json#myElement": "", + "//anotherserver.com/files/example.json": None, + }, + } + + +def test_is_local(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_local() == True + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_local() == False + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_local() == False + + +def test_is_remote(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == False + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == True + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_remote() == True + + +def test_is_url(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["local_references"]: + ref = Reference(ref_str) + assert ref.is_url() == False + + for ref_str in data_set["remote_references"]: + ref = Reference(ref_str) + assert ref.is_url() == False + + for ref_str in data_set["url_references"]: + ref = Reference(ref_str) + assert ref.is_url() == True + + +def test_is_absolute(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["absolute_references"]: + ref = Reference(ref_str) + assert ref.is_absolute() == True + + for ref_str in data_set["relative_references"]: + ref = Reference(ref_str) + assert ref.is_absolute() == False + + +def test_is_relative(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["absolute_references"]: + ref = Reference(ref_str) + assert ref.is_relative() == False + + for ref_str in data_set["relative_references"]: + ref = Reference(ref_str) + assert ref.is_relative() == True + + +def test_pointer(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["pointer_by_reference"].keys(): + ref = Reference(ref_str) + pointer = data_set["pointer_by_reference"][ref_str] + assert ref.pointer.value == pointer + + +def test_pointer_parent(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["pointerparent_by_reference"].keys(): + ref = Reference(ref_str) + pointer_parent = data_set["pointerparent_by_reference"][ref_str] + + if pointer_parent is not None: + assert ref.pointer.parent.value == pointer_parent + else: + assert ref.pointer.parent == None + + +def test_path(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["path_by_reference"].keys(): + ref = Reference(ref_str) + path = data_set["path_by_reference"][ref_str] + assert ref.path == path + + +def test_is_full_document(): + from openapi_python_client.resolver.reference import Reference + + data_set = get_data_set() + + for ref_str in data_set["full_document_references"]: + ref = Reference(ref_str) + assert ref.is_full_document() == True + assert ref.pointer.parent == None + + for ref_str in data_set["not_full_document_references"]: + ref = Reference(ref_str) + assert ref.is_full_document() == False + assert ref.pointer.parent != None + + +def test_value(): + from openapi_python_client.resolver.reference import Reference + + ref = Reference("fooBaR") + assert ref.value == "fooBaR" + + ref = Reference("FooBAR") + assert ref.value == "FooBAR" diff --git a/tests/test_resolver/test_resolver_schema_resolver.py b/tests/test_resolver/test_resolver_schema_resolver.py new file mode 100644 index 000000000..36caa3d7e --- /dev/null +++ b/tests/test_resolver/test_resolver_schema_resolver.py @@ -0,0 +1,267 @@ +import pathlib +import urllib +import urllib.parse + +import pytest + + +def test___init__invalid_data(mocker): + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + with pytest.raises(ValueError): + SchemaResolver(None) + + invalid_url = "foobar" + with pytest.raises(ValueError): + SchemaResolver(invalid_url) + + invalid_url = 42 + with pytest.raises(urllib.error.URLError): + SchemaResolver(invalid_url) + + invalid_url = mocker.Mock() + with pytest.raises(urllib.error.URLError): + SchemaResolver(invalid_url) + + +def test__init_with_filepath(mocker): + mocker.patch("openapi_python_client.resolver.schema_resolver.SchemaResolver._isapath", return_value=True) + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", return_value={}) + path = mocker.MagicMock() + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + resolver = SchemaResolver(path) + resolver.resolve() + + path.absolute().read_bytes.assert_called_once() + + +def test__init_with_url(mocker): + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", return_value={}) + url_parse = mocker.patch( + "urllib.parse.urlparse", + return_value=urllib.parse.ParseResult( + scheme="http", netloc="foobar.io", path="foo", params="", query="", fragment="/bar" + ), + ) + get = mocker.patch("httpx.get") + + url = mocker.MagicMock() + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + resolver = SchemaResolver(url) + resolver.resolve() + + url_parse.assert_called_once_with(url) + get.assert_called_once() + + +def test__resolve_schema_references_with_path(mocker): + read_bytes = mocker.patch("pathlib.Path.read_bytes") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + path = pathlib.Path("/foo/bar/foobar") + path_parent = str(path.parent) + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(path, data): + if path == "/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if path == "/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if path == "/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "/barfoo.io/foobar#foobar"}} + + if path == "/barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if path == "/barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "/bar.foo/foobar"}} + + if path == "/bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "/foo.bar/foobar"}} + + if path == "/foo.bar/foobar": + return {"foobar": "foo.bar/foobar", "bar": {"$ref": "/foo/bar/foobar"}} # Loop to first path + + raise ValueError(f"Unexpected path {path}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(path) + resolver._resolve_schema_references(path_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "/foo/bar/foobar" in external_schemas + assert "/foo/bar/bar" in external_schemas + assert "/foo/bar" in external_schemas + assert "/barfoo.io/foobar" in external_schemas + assert "/barfoo.io/bar" in external_schemas + assert "/bar.foo/foobar" in external_schemas + assert "/foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_with_url(mocker): + get = mocker.patch("httpx.get") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + url = "http://foobar.io/foo/bar/foobar" + url_parent = "http://foobar.io/foo/bar/" + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(url, data): + if url == "http://foobar.io/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if url == "http://foobar.io/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if url == "http://foobar.io/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if url == "http://barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if url == "http://barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "https://bar.foo/foobar"}} + + if url == "https://bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "//foo.bar/foobar"}} + + if url == "https://foo.bar/foobar": + return {"foobar": "foo.bar/foobar", "bar": {"$ref": "http://foobar.io/foo/bar/foobar"}} # Loop to first uri + + raise ValueError(f"Unexpected url {url}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + + resolver = SchemaResolver(url) + resolver._resolve_schema_references(url_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "http://foobar.io/foo/bar/bar" in external_schemas + assert "http://foobar.io/foo/bar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/bar" in external_schemas + assert "https://bar.foo/foobar" in external_schemas + assert "https://foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_mix_path_and_url(mocker): + read_bytes = mocker.patch("pathlib.Path.read_bytes") + get = mocker.patch("httpx.get") + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + path = pathlib.Path("/foo/bar/foobar") + path_parent = str(path.parent) + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(path, data): + if path == "/foo/bar/foobar": + return {"foobar": "bar", "bar": {"$ref": "bar#/foobar"}, "local": {"$ref": "#/toto"}} + + if path == "/foo/bar/bar": + return {"foobar": "bar", "bar": {"$ref": "../bar#/foobar"}} + + if path == "/foo/bar": + return {"foobar": "bar/bar", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if path == "http://barfoo.io/foobar": + return {"foobar": "barfoo.io/foobar", "bar": {"$ref": "./bar#foobar"}} + + if path == "http://barfoo.io/bar": + return {"foobar": "barfoo.io/bar", "bar": {"$ref": "https://bar.foo/foobar"}} + + if path == "https://bar.foo/foobar": + return {"foobar": "bar.foo/foobar", "bar": {"$ref": "//foo.bar/foobar"}} + + if path == "https://foo.bar/foobar": + return {"foobar": "foo.bar/foobar"} + + raise ValueError(f"Unexpected path {path}") + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(path) + resolver._resolve_schema_references(path_parent, schema, external_schemas, errors, True) + + assert len(errors) == 0 + assert "/foo/bar/foobar" in external_schemas + assert "/foo/bar/bar" in external_schemas + assert "/foo/bar" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + assert "http://barfoo.io/bar" in external_schemas + assert "https://bar.foo/foobar" in external_schemas + assert "https://foo.bar/foobar" in external_schemas + + +def test__resolve_schema_references_with_error(mocker): + get = mocker.patch("httpx.get") + + import httpcore + + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + url = "http://foobar.io/foo/bar/foobar" + url_parent = "http://foobar.io/foo/bar/" + schema = {"foo": {"$ref": "foobar#/foobar"}} + external_schemas = {} + errors = [] + + def _datalaod_mocked_result(url, data): + if url == "http://foobar.io/foo/bar/foobar": + return { + "foobar": "bar", + "bar": {"$ref": "bar#/foobar"}, + "barfoor": {"$ref": "barfoo#foobar"}, + "local": {"$ref": "#/toto"}, + } + + if url == "http://foobar.io/foo/bar/bar": + raise httpcore.NetworkError("mocked error") + + if url == "http://foobar.io/foo/bar/barfoo": + return {"foobar": "foo/bar/barfoo", "bar": {"$ref": "//barfoo.io/foobar#foobar"}} + + if url == "http://barfoo.io/foobar": + return {"foobar": "foobar"} + + mocker.patch("openapi_python_client.resolver.schema_resolver.DataLoader.load", _datalaod_mocked_result) + resolver = SchemaResolver(url) + resolver._resolve_schema_references(url_parent, schema, external_schemas, errors, True) + + assert len(errors) == 1 + assert errors[0] == "Failed to gather external reference data of bar#/foobar from http://foobar.io/foo/bar/bar" + assert "http://foobar.io/foo/bar/bar" not in external_schemas + assert "http://foobar.io/foo/bar/foobar" in external_schemas + assert "http://foobar.io/foo/bar/barfoo" in external_schemas + assert "http://barfoo.io/foobar" in external_schemas + + +def test___lookup_schema_references(): + from openapi_python_client.resolver.schema_resolver import SchemaResolver + + data_set = { + "foo": {"$ref": "#/ref_1"}, + "bar": {"foobar": {"$ref": "#/ref_2"}}, + "foobar": [{"foo": {"$ref": "#/ref_3"}}, {"bar": [{"foobar": {"$ref": "#/ref_4"}}]}], + } + + resolver = SchemaResolver("http://foobar.io") + expected_references = sorted([f"#/ref_{x}" for x in range(1, 5)]) + references = sorted([x.value for x in resolver._lookup_schema_references(data_set)]) + + for idx, ref in enumerate(references): + assert expected_references[idx] == ref