Skip to content

Extend dynamicRef keyword #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions jsonschema/_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import deque
from collections.abc import Mapping, MutableMapping, Sequence
from urllib.parse import urlsplit
import itertools
Expand Down Expand Up @@ -346,3 +347,84 @@ def find_evaluated_property_keys_by_schema(validator, instance, schema):
)

return evaluated_keys


def _schema_is_referenced(schema, parent_schema):
"""
Checks if a schema is referenced by another schema
"""
return ("$id" in schema and "$ref" in parent_schema
and parent_schema["$ref"] == schema["$id"])


def _find_dynamic_anchor_extender(validator, scopes, fragment, schema):
"""
Find a schema that extends the dynamic anchor
"""
for url in scopes:
with validator.resolver.resolving(url) as parent_schema:
if _schema_is_referenced(schema, parent_schema):
return validator.resolver.resolve_fragment(
parent_schema,
fragment,
)


def _find_dynamic_anchor_intermediate(validator, scopes, fragment):
"""
Find a schema that extends the dynamic anchor by an intermediate schema
"""
for url in scopes:
with validator.resolver.resolving(url) as schema:
if "$id" in schema:
for intermediate_url in scopes:
with validator.resolver.resolving(
intermediate_url) as intermediate_schema:
for subschema in search_schema(
intermediate_schema, match_keyword("$ref")):
if _schema_is_referenced(subschema, schema):
return _find_dynamic_anchor_extender(
validator,
scopes,
fragment,
subschema,
)


def dynamic_anchor_extender(validator, scopes, fragment, schema, subschema):
extender_schema = _find_dynamic_anchor_extender(
validator, scopes, fragment, schema,
)
if not extender_schema:
extender_schema = _find_dynamic_anchor_extender(
validator, scopes, fragment, subschema,
)
if not extender_schema:
extender_schema = _find_dynamic_anchor_intermediate(
validator, scopes, fragment,
)

return extender_schema


def match_keyword(keyword):

def matcher(value):
if keyword in value:
yield value

return matcher


def search_schema(schema, matcher):
"""Breadth-first search routine."""
values = deque([schema])
while values:
value = values.pop()
if isinstance(value, list):
values.extendleft(value)
continue
if not isinstance(value, dict):
continue
yield from matcher(value)
values.extendleft(value.values())
12 changes: 10 additions & 2 deletions jsonschema/_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re

from jsonschema._utils import (
dynamic_anchor_extender,
ensure_list,
equal,
extras_msg,
Expand Down Expand Up @@ -302,14 +303,21 @@ def ref(validator, ref, instance, schema):

def dynamicRef(validator, dynamicRef, instance, schema):
_, fragment = urldefrag(dynamicRef)

for url in validator.resolver._scopes_stack:
lookup_url = urljoin(url, dynamicRef)
with validator.resolver.resolving(lookup_url) as subschema:
if ("$dynamicAnchor" in subschema
and fragment == subschema["$dynamicAnchor"]):
scope_stack = list(validator.resolver._scopes_stack)
scope_stack.reverse()
extended_schema = dynamic_anchor_extender(
validator, scope_stack, fragment, schema, subschema,
)
if extended_schema:
yield from validator.descend(instance, extended_schema)
break

yield from validator.descend(instance, subschema)
break
else:
with validator.resolver.resolving(dynamicRef) as subschema:
yield from validator.descend(instance, subschema)
Expand Down
10 changes: 1 addition & 9 deletions jsonschema/tests/test_jsonschema_test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,7 @@ def leap_second(test):
skip=lambda test: (
narrow_unicode_build(test)
or skip(
message="dynamicRef support isn't working yet.",
subject="dynamicRef",
)(test)
or skip(
message="These tests depends on dynamicRef working.",
subject="defs",
)(test)
or skip(
message="These tests depends on dynamicRef working.",
message="These tests require an extension or the url resolver.",
subject="anchor",
case_description="same $anchor with different base uri",
)(test)
Expand Down
28 changes: 5 additions & 23 deletions jsonschema/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
from __future__ import annotations

from collections import deque
from collections.abc import Sequence
from functools import lru_cache
from urllib.parse import unquote, urldefrag, urljoin, urlsplit
Expand Down Expand Up @@ -770,7 +769,7 @@ def _find_in_referrer(self, key):
@lru_cache()
def _get_subschemas_cache(self):
cache = {key: [] for key in _SUBSCHEMAS_KEYWORDS}
for keyword, subschema in _search_schema(
for keyword, subschema in _utils.search_schema(
self.referrer, _match_subschema_keywords,
):
cache[keyword].append(subschema)
Expand Down Expand Up @@ -844,7 +843,10 @@ def resolve_fragment(self, document, fragment):
else:

def find(key):
yield from _search_schema(document, _match_keyword(key))
yield from _utils.search_schema(
document,
_utils.match_keyword(key),
)

for keyword in ["$anchor", "$dynamicAnchor"]:
for subschema in find(keyword):
Expand Down Expand Up @@ -930,32 +932,12 @@ def resolve_remote(self, uri):
_SUBSCHEMAS_KEYWORDS = ("$id", "id", "$anchor", "$dynamicAnchor")


def _match_keyword(keyword):

def matcher(value):
if keyword in value:
yield value

return matcher


def _match_subschema_keywords(value):
for keyword in _SUBSCHEMAS_KEYWORDS:
if keyword in value:
yield keyword, value


def _search_schema(schema, matcher):
"""Breadth-first search routine."""
values = deque([schema])
while values:
value = values.pop()
if not isinstance(value, dict):
continue
yield from matcher(value)
values.extendleft(value.values())


def validate(instance, schema, cls=None, *args, **kwargs):
"""
Validate an instance under the given schema.
Expand Down