-
Notifications
You must be signed in to change notification settings - Fork 0
Features/add-remote-reference-support #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9bbe517
102bd26
e0c20e4
c90005f
8f404a3
dde5fac
60fcdb1
3d6367f
33276c9
ad9c7da
1805b0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import yaml | ||
|
||
from .resolver_types import SchemaData | ||
|
||
|
||
class DataLoader: | ||
@classmethod | ||
def load(cls, path: str, data: bytes) -> SchemaData: | ||
data_type = path.split(".")[-1].casefold() | ||
|
||
if data_type == "json": | ||
return cls.load_json(data) | ||
else: | ||
return cls.load_yaml(data) | ||
|
||
@classmethod | ||
def load_json(cls, data: bytes) -> SchemaData: | ||
raise NotImplementedError() | ||
|
||
@classmethod | ||
def load_yaml(cls, data: bytes) -> SchemaData: | ||
return yaml.safe_load(data) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import urllib.parse | ||
from typing import List, Union | ||
|
||
|
||
class Pointer: | ||
""" https://tools.ietf.org/html/rfc6901 """ | ||
|
||
def __init__(self, pointer: str) -> None: | ||
if pointer is None or pointer != "" and not pointer.startswith("/"): | ||
raise ValueError(f'Invalid pointer value {pointer}, it must match: *( "/" reference-token )') | ||
|
||
self._pointer = pointer | ||
|
||
@property | ||
def value(self) -> str: | ||
return self._pointer | ||
|
||
@property | ||
def parent(self) -> Union["Pointer", None]: | ||
tokens = self.tokens(False) | ||
|
||
if len(tokens) > 1: | ||
tokens.pop() | ||
return Pointer("/".join(tokens)) | ||
else: | ||
assert tokens[-1] == "" | ||
return None | ||
|
||
def tokens(self, unescape: bool = True) -> List[str]: | ||
tokens = [] | ||
|
||
if unescape: | ||
for token in self._pointer.split("/"): | ||
tokens.append(self._unescape(token)) | ||
else: | ||
tokens = self._pointer.split("/") | ||
|
||
return tokens | ||
|
||
@property | ||
def unescapated_value(self) -> str: | ||
return self._unescape(self._pointer) | ||
|
||
def _unescape(self, data: str) -> str: | ||
data = urllib.parse.unquote(data) | ||
data = data.replace("~1", "/") | ||
data = data.replace("~0", "~") | ||
return data |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import urllib.parse | ||
|
||
from .pointer import Pointer | ||
|
||
|
||
class Reference: | ||
""" https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03 """ | ||
|
||
def __init__(self, reference: str): | ||
self._ref = reference | ||
self._parsed_ref = urllib.parse.urlparse(reference) | ||
|
||
@property | ||
def path(self) -> str: | ||
return urllib.parse.urldefrag(self._parsed_ref.geturl()).url | ||
|
||
@property | ||
def pointer(self) -> Pointer: | ||
frag = self._parsed_ref.fragment | ||
if self.is_url() and frag != "" and not frag.startswith("/"): | ||
frag = f"/{frag}" | ||
|
||
return Pointer(frag) | ||
|
||
def is_relative(self) -> bool: | ||
""" return True if reference path is a relative path """ | ||
return not self.is_absolute() | ||
|
||
def is_absolute(self) -> bool: | ||
""" return True is reference path is an absolute path """ | ||
return self._parsed_ref.netloc != "" | ||
|
||
@property | ||
def value(self) -> str: | ||
return self._ref | ||
|
||
def is_url(self) -> bool: | ||
""" return True if the reference path is pointing to an external url location """ | ||
return self.is_remote() and self._parsed_ref.netloc != "" | ||
|
||
def is_remote(self) -> bool: | ||
""" return True if the reference pointer is pointing to a remote document """ | ||
return not self.is_local() | ||
|
||
def is_local(self) -> bool: | ||
""" return True if the reference pointer is pointing to the current document """ | ||
return self._parsed_ref.path == "" | ||
|
||
def is_full_document(self) -> bool: | ||
""" return True if the reference pointer is pointing to the whole document content """ | ||
return self.pointer.parent is None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
import hashlib | ||
from typing import Any, Dict, Generator, List, Tuple, Union, cast | ||
|
||
from .reference import Reference | ||
from .resolver_types import SchemaData | ||
|
||
|
||
class ResolvedSchema: | ||
def __init__(self, root: SchemaData, refs: Dict[str, SchemaData], errors: List[str]): | ||
self._root: SchemaData = root | ||
self._refs: Dict[str, SchemaData] = refs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In last refactor, the keys of refenrences dict changed from: It may need some fix in |
||
self._errors: List[str] = errors | ||
self._resolved_remotes_components: SchemaData = cast(SchemaData, {}) | ||
|
||
self._resolved_schema: SchemaData = cast(SchemaData, {}) | ||
if len(self._errors) == 0: | ||
self._process() | ||
|
||
@property | ||
def schema(self) -> SchemaData: | ||
return self._root | ||
|
||
@property | ||
def errors(self) -> List[str]: | ||
return self._errors.copy() | ||
|
||
def _process(self) -> None: | ||
self._process_remote_paths() | ||
self._process_remote_components(self._root) | ||
self._root.update(self._resolved_remotes_components) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The merge here is too wild, it need fixe. I was doing my test on: It's |
||
|
||
def _process_remote_paths(self) -> None: | ||
refs_to_replace = [] | ||
for owner, ref_key, ref_val in self._lookup_schema_references_in(self._root, "paths"): | ||
ref = Reference(ref_val) | ||
|
||
if ref.is_local(): | ||
continue | ||
|
||
remote_path = ref.pointer.value | ||
path = ref.path | ||
|
||
if remote_path not in self._refs: | ||
self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) | ||
else: | ||
remote_schema = self._refs[remote_path] | ||
remote_value = self._lookup_dict(remote_schema, path) | ||
if not remote_value: | ||
self._errors.append("Failed to read remote value {}, in remote ref {}".format(path, remote_path)) | ||
else: | ||
refs_to_replace.append((owner, remote_schema, remote_value)) | ||
|
||
for owner, remote_schema, remote_value in refs_to_replace: | ||
self._process_remote_components(remote_schema, remote_value, 1) | ||
self._replace_reference_with(owner, remote_value) | ||
|
||
def _process_remote_components( | ||
self, owner: SchemaData, subpart: Union[SchemaData, None] = None, depth: int = 0 | ||
) -> None: | ||
target = subpart if subpart else owner | ||
|
||
for parent, ref_key, ref_val in self._lookup_schema_references(target): | ||
ref = Reference(ref_val) | ||
|
||
if ref.is_local(): | ||
# print('Found local reference >> {0}'.format(ref.value)) | ||
if depth > 0: | ||
self._transform_to_local_components(owner, ref) | ||
else: | ||
remote_path = ref.pointer.value | ||
if remote_path not in self._refs: | ||
self._errors.append("Failed to resolve remote reference > {0}".format(remote_path)) | ||
else: | ||
remote_owner = self._refs[remote_path] | ||
self._transform_to_local_components(remote_owner, ref) | ||
self._transform_to_local_ref(parent, ref) | ||
|
||
def _transform_to_local_components(self, owner: SchemaData, ref: Reference) -> None: | ||
self._ensure_components_dir_exists(ref) | ||
|
||
# print('Processing remote component > {0}'.format(ref.value)) | ||
remote_component = self._lookup_dict(owner, ref.path) | ||
pointer_parent = ref.pointer.parent | ||
|
||
if pointer_parent is not None: | ||
root_components_dir = self._lookup_dict(self._resolved_remotes_components, pointer_parent.value) | ||
component_name = ref.path.split("/")[-1] | ||
|
||
if component_name == "SorTransparentContainer" or component_name == "sorTransparentContainer": | ||
print(ref.value) | ||
|
||
if remote_component is None: | ||
print("Weirdy relookup of >> {0}".format(ref.value)) | ||
assert ref.is_local() and self._lookup_dict(self._resolved_remotes_components, ref.path) | ||
return | ||
|
||
if "$ref" in remote_component: | ||
subref = Reference(remote_component["$ref"]) | ||
if not subref.is_local(): | ||
print("Lookup remote ref >>> {0}".format(subref.value)) | ||
return self._process_remote_components(remote_component) | ||
|
||
if root_components_dir: | ||
if component_name in root_components_dir: | ||
local_component_hash = self._reference_schema_hash(root_components_dir[component_name]) | ||
remote_component_hash = self._reference_schema_hash(remote_component) | ||
|
||
if local_component_hash == remote_component_hash: | ||
return | ||
else: | ||
pass | ||
# print('=' * 120) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we have a collision to handle, we have two remote components with the same name but different content, may be implemented later, but definitly required There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we should probably extract the code responsible to detect collisions into its own class, thus later on we could customise collision detection with DI. |
||
# print('TODO: Find compoment collision to handle on >>> {0}'.format(ref.path)) | ||
# print('Local componente {0} >> {1}'.format(local_component_hash, root_components_dir[component_name])) | ||
# print('') | ||
# print('Remote componente {0} >> {1}'.format(remote_component_hash, remote_component)) | ||
# print('=' * 120) | ||
else: | ||
root_components_dir[component_name] = remote_component | ||
self._process_remote_components(owner, remote_component, 2) | ||
|
||
def _ensure_components_dir_exists(self, ref: Reference) -> None: | ||
cursor = self._resolved_remotes_components | ||
pointer_dir = ref.pointer.parent | ||
assert pointer_dir is not None | ||
|
||
for key in pointer_dir.value.split("/"): # noqa | ||
if key == "": | ||
continue | ||
|
||
if key not in cursor: | ||
cursor[key] = {} | ||
|
||
cursor = cursor[key] | ||
|
||
def _transform_to_local_ref(self, owner: Dict[str, Any], ref: Reference) -> None: | ||
owner["$ref"] = "#{0}".format(ref.path) | ||
|
||
def _lookup_dict(self, attr: SchemaData, query: str) -> Union[SchemaData, None]: | ||
cursor = attr | ||
query_parts = [] | ||
|
||
if query.startswith("/paths"): | ||
query_parts = ["paths", query.replace("/paths//", "/").replace("/paths", "")] | ||
else: | ||
query_parts = query.split("/") | ||
|
||
for key in query_parts: | ||
if key == "": | ||
continue | ||
|
||
if isinstance(cursor, dict) and key in cursor: | ||
cursor = cursor[key] | ||
else: | ||
return None | ||
return cursor | ||
|
||
def _replace_reference_with(self, root: Dict[str, Any], new_value: Dict[str, Any]) -> None: | ||
for key in new_value: | ||
root[key] = new_value[key] | ||
|
||
root.pop("$ref") | ||
|
||
def _lookup_schema_references_in( | ||
self, attr: SchemaData, path: str | ||
) -> Generator[Tuple[SchemaData, str, Any], None, None]: | ||
if not isinstance(attr, dict) or path not in attr: | ||
return | ||
|
||
yield from self._lookup_schema_references(attr[path]) | ||
|
||
def _lookup_schema_references(self, attr: Any) -> Generator[Tuple[SchemaData, str, str], None, None]: | ||
if isinstance(attr, dict): | ||
for key, val in attr.items(): | ||
if key == "$ref": | ||
yield cast(SchemaData, attr), cast(str, key), cast(str, val) | ||
else: | ||
yield from self._lookup_schema_references(val) | ||
|
||
elif isinstance(attr, list): | ||
for val in attr: | ||
yield from self._lookup_schema_references(val) | ||
|
||
def _reference_schema_hash(self, schema: Dict[str, Any]) -> str: | ||
md5 = hashlib.md5() | ||
hash_elms = [] | ||
for key in schema.keys(): | ||
if key == "description": | ||
continue | ||
|
||
if key == "type": | ||
hash_elms.append(schema[key]) | ||
|
||
if key == "allOf": | ||
for item in schema[key]: | ||
hash_elms.append(str(item)) | ||
|
||
hash_elms.append(key) | ||
|
||
hash_elms.sort() | ||
md5.update(";".join(hash_elms).encode("utf-8")) | ||
return md5.hexdigest() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from typing import Any, Dict, NewType | ||
|
||
SchemaData = NewType("SchemaData", Dict[str, Any]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some refactoring on the
Reference
object, that as has been quickly integrated to theResolvedSchema
, but not tested, there is maybe some thing to fix there