diff --git a/HISTORY.rst b/HISTORY.rst index a593b3ba..2523469f 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,8 @@ History ======= 22.1.0 (UNRELEASED) ------------------- +* cattrs now uses the CalVer versioning convention. +* ``attrs`` and dataclass structuring is now ~25% faster. * Fix an issue structuring bare ``typing.List`` s on Pythons lower than 3.9. (`#209 `_) diff --git a/poetry.lock b/poetry.lock index a1f7634e..513146ff 100644 --- a/poetry.lock +++ b/poetry.lock @@ -192,7 +192,7 @@ doc = ["myst-parser", "sphinx-copybutton", "sphinx-design", "sphinx-inline-tabs" [[package]] name = "hypothesis" -version = "6.34.2" +version = "6.35.1" description = "A library for property-based testing" category = "dev" optional = false @@ -247,7 +247,7 @@ test = ["flake8 (>=3.8.4,<3.9.0)", "pycodestyle (>=2.6.0,<2.7.0)"] [[package]] name = "importlib-metadata" -version = "4.10.0" +version = "4.10.1" description = "Read metadata from Python packages" category = "dev" optional = false @@ -430,7 +430,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pygments" -version = "2.11.1" +version = "2.11.2" description = "Pygments is a syntax highlighting package written in Python." category = "dev" optional = false @@ -541,7 +541,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [[package]] name = "requests" -version = "2.27.0" +version = "2.27.1" description = "Python HTTP for Humans." category = "dev" optional = false @@ -591,7 +591,7 @@ python-versions = ">=3.6" [[package]] name = "sphinx" -version = "4.3.2" +version = "4.4.0" description = "Python documentation generator" category = "dev" optional = false @@ -603,6 +603,7 @@ babel = ">=1.3" colorama = {version = ">=0.3.5", markers = "sys_platform == \"win32\""} docutils = ">=0.14,<0.18" imagesize = "*" +importlib-metadata = {version = ">=4.4", markers = "python_version < \"3.10\""} Jinja2 = ">=2.3" packaging = "*" Pygments = ">=2.0" @@ -617,7 +618,7 @@ sphinxcontrib-serializinghtml = ">=1.1.5" [package.extras] docs = ["sphinxcontrib-websupport"] -lint = ["flake8 (>=3.5.0)", "isort", "mypy (>=0.920)", "docutils-stubs", "types-typed-ast", "types-pkg-resources", "types-requests"] +lint = ["flake8 (>=3.5.0)", "isort", "mypy (>=0.931)", "docutils-stubs", "types-typed-ast", "types-requests"] test = ["pytest", "pytest-cov", "html5lib", "cython", "typed-ast"] [[package]] @@ -764,7 +765,7 @@ python-versions = ">=3.7" [[package]] name = "urllib3" -version = "1.26.7" +version = "1.26.8" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "dev" optional = false @@ -922,8 +923,8 @@ furo = [ {file = "furo-2021.11.23.tar.gz", hash = "sha256:54cecac5f3b688b5c7370d72ecdf1cd91a6c53f0f42751f4a719184b562cde70"}, ] hypothesis = [ - {file = "hypothesis-6.34.2-py3-none-any.whl", hash = "sha256:803792d416ff71307d775fe760e2b2f07ca302a2c941b576629668092b9f3e3d"}, - {file = "hypothesis-6.34.2.tar.gz", hash = "sha256:317f8d2f670fa69e258ab43e21c2befd413c559e386581f7e9641a80460b1063"}, + {file = "hypothesis-6.35.1-py3-none-any.whl", hash = "sha256:536b928d14934809d0da676579436aaa379b06df84408b4c154412e8fd4e1b91"}, + {file = "hypothesis-6.35.1.tar.gz", hash = "sha256:8533812bd277925b0c594ef2681dc8f4289a7b6be0169cc2df295d096c7cd783"}, ] idna = [ {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"}, @@ -951,8 +952,8 @@ immutables = [ {file = "immutables-0.15.tar.gz", hash = "sha256:3713ab1ebbb6946b7ce1387bb9d1d7f5e09c45add58c2a2ee65f963c171e746b"}, ] importlib-metadata = [ - {file = "importlib_metadata-4.10.0-py3-none-any.whl", hash = "sha256:b7cf7d3fef75f1e4c80a96ca660efbd51473d7e8f39b5ab9210febc7809012a4"}, - {file = "importlib_metadata-4.10.0.tar.gz", hash = "sha256:92a8b58ce734b2a4494878e0ecf7d79ccd7a128b5fc6014c401e0b61f006f0f6"}, + {file = "importlib_metadata-4.10.1-py3-none-any.whl", hash = "sha256:899e2a40a8c4a1aec681feef45733de8a6c58f3f6a0dbed2eb6574b4387a77b6"}, + {file = "importlib_metadata-4.10.1.tar.gz", hash = "sha256:951f0d8a5b7260e9db5e41d429285b5f451e928479f19d80818878527d36e95e"}, ] iniconfig = [ {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, @@ -1162,8 +1163,8 @@ pyflakes = [ {file = "pyflakes-2.3.1.tar.gz", hash = "sha256:f5bc8ecabc05bb9d291eb5203d6810b49040f6ff446a756326104746cc00c1db"}, ] pygments = [ - {file = "Pygments-2.11.1-py3-none-any.whl", hash = "sha256:9135c1af61eec0f650cd1ea1ed8ce298e54d56bcd8cc2ef46edd7702c171337c"}, - {file = "Pygments-2.11.1.tar.gz", hash = "sha256:59b895e326f0fb0d733fd28c6839bd18ad0687ba20efc26d4277fd1d30b971f4"}, + {file = "Pygments-2.11.2-py3-none-any.whl", hash = "sha256:44238f1b60a76d78fc8ca0528ee429702aae011c265fe6a8dd8b63049ae41c65"}, + {file = "Pygments-2.11.2.tar.gz", hash = "sha256:4e426f72023d88d03b2fa258de560726ce890ff3b630f88c21cbb8b2503b8c6a"}, ] pymongo = [ {file = "pymongo-3.12.3-cp27-cp27m-macosx_10_14_intel.whl", hash = "sha256:c164eda0be9048f83c24b9b2656900041e069ddf72de81c17d874d0c32f6079f"}, @@ -1330,8 +1331,8 @@ pyyaml = [ {file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"}, ] requests = [ - {file = "requests-2.27.0-py2.py3-none-any.whl", hash = "sha256:f71a09d7feba4a6b64ffd8e9d9bc60f9bf7d7e19fd0e04362acb1cfc2e3d98df"}, - {file = "requests-2.27.0.tar.gz", hash = "sha256:8e5643905bf20a308e25e4c1dd379117c09000bf8a82ebccc462cfb1b34a16b5"}, + {file = "requests-2.27.1-py2.py3-none-any.whl", hash = "sha256:f22fa1e554c9ddfd16e6e41ac79759e17be9e492b3587efa038054674760e72d"}, + {file = "requests-2.27.1.tar.gz", hash = "sha256:68d7c56fd5a8999887728ef304a6d12edc7be74f1cfa47714fc8b414525c9a61"}, ] six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, @@ -1350,8 +1351,8 @@ soupsieve = [ {file = "soupsieve-2.3.1.tar.gz", hash = "sha256:b8d49b1cd4f037c7082a9683dfa1801aa2597fb11c3a1155b7a5b94829b4f1f9"}, ] sphinx = [ - {file = "Sphinx-4.3.2-py3-none-any.whl", hash = "sha256:6a11ea5dd0bdb197f9c2abc2e0ce73e01340464feaece525e64036546d24c851"}, - {file = "Sphinx-4.3.2.tar.gz", hash = "sha256:0a8836751a68306b3fe97ecbe44db786f8479c3bf4b80e3a7f5c838657b4698c"}, + {file = "Sphinx-4.4.0-py3-none-any.whl", hash = "sha256:5da895959511473857b6d0200f56865ed62c31e8f82dd338063b84ec022701fe"}, + {file = "Sphinx-4.4.0.tar.gz", hash = "sha256:6caad9786055cb1fa22b4a365c1775816b876f91966481765d7d50e9f0dd35cc"}, ] sphinxcontrib-applehelp = [ {file = "sphinxcontrib-applehelp-1.0.2.tar.gz", hash = "sha256:a072735ec80e7675e3f432fcae8610ecf509c5f1869d17e2eecff44389cdbc58"}, @@ -1471,8 +1472,8 @@ ujson = [ {file = "ujson-5.1.0.tar.gz", hash = "sha256:a88944d2f99db71a3ca0c63d81f37e55b660edde0b07216fb65a3e46403ef004"}, ] urllib3 = [ - {file = "urllib3-1.26.7-py2.py3-none-any.whl", hash = "sha256:c4fdf4019605b6e5423637e01bc9fe4daef873709a7973e195ceba0a62bbc844"}, - {file = "urllib3-1.26.7.tar.gz", hash = "sha256:4987c65554f7a2dbf30c18fd48778ef124af6fab771a377103da0585e2336ece"}, + {file = "urllib3-1.26.8-py2.py3-none-any.whl", hash = "sha256:000ca7f471a233c2251c6c7023ee85305721bfdf18621ebff4fd17a8653427ed"}, + {file = "urllib3-1.26.8.tar.gz", hash = "sha256:0e7c33d9a63e7ddfcb86780aac87befc2fbddf46c58dbb487e0855f7ceec283c"}, ] virtualenv = [ {file = "virtualenv-20.13.0-py2.py3-none-any.whl", hash = "sha256:339f16c4a86b44240ba7223d0f93a7887c3ca04b5f9c8129da7958447d079b09"}, diff --git a/src/cattr/gen.py b/src/cattr/gen.py index 344e9b8f..660d4d53 100644 --- a/src/cattr/gen.py +++ b/src/cattr/gen.py @@ -1,557 +1,21 @@ -import linecache -import re -import uuid -from dataclasses import is_dataclass -from threading import local -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Mapping, - Optional, - Type, - TypeVar, +from cattrs.gen import ( + AttributeOverride, + make_dict_structure_fn, + make_dict_unstructure_fn, + make_hetero_tuple_unstructure_fn, + make_iterable_unstructure_fn, + make_mapping_structure_fn, + make_mapping_unstructure_fn, + override, ) -import attr -from attr import NOTHING, resolve_types - -from ._compat import ( - adapted_fields, - get_args, - get_origin, - is_annotated, - is_bare, - is_generic, -) -from ._generics import deep_copy_with - -if TYPE_CHECKING: # pragma: no cover - from cattr.converters import Converter - - -@attr.s(slots=True, frozen=True) -class AttributeOverride: - omit_if_default: Optional[bool] = attr.ib(default=None) - rename: Optional[str] = attr.ib(default=None) - omit: bool = attr.ib(default=False) # Omit the field completely. - - -def override(omit_if_default=None, rename=None, omit: bool = False): - return AttributeOverride( - omit_if_default=omit_if_default, rename=rename, omit=omit - ) - - -_neutral = AttributeOverride() -_already_generating = local() -T = TypeVar("T") - - -def make_dict_unstructure_fn( - cl, - converter, - _cattrs_omit_if_default: bool = False, - _cattrs_use_linecache: bool = True, - **kwargs, -): - """ - Generate a specialized dict unstructuring function for an attrs class or a - dataclass. - """ - origin = get_origin(cl) - attrs = adapted_fields(origin or cl) # type: ignore - - if any(isinstance(a.type, str) for a in attrs): - # PEP 563 annotations - need to be resolved. - resolve_types(cl) - - mapping = {} - if is_generic(cl): - mapping = _generate_mapping(cl, mapping) - - for base in getattr(origin, "__orig_bases__", ()): - if is_generic(base) and not str(base).startswith("typing.Generic"): - mapping = _generate_mapping(base, mapping) - break - cl = origin - - cl_name = cl.__name__ - fn_name = "unstructure_" + cl_name - globs = {} - lines = [] - post_lines = [] - - # We keep track of what we're generating to help with recursive - # class graphs. - try: - working_set = _already_generating.working_set - except AttributeError: - working_set = set() - _already_generating.working_set = working_set - if cl in working_set: - raise RecursionError() - else: - working_set.add(cl) - - try: - lines.append(f"def {fn_name}(instance):") - lines.append(" res = {") - for a in attrs: - attr_name = a.name - override = kwargs.pop(attr_name, _neutral) - if override.omit: - continue - kn = attr_name if override.rename is None else override.rename - d = a.default - - # For each attribute, we try resolving the type here and now. - # If a type is manually overwritten, this function should be - # regenerated. - handler = None - if a.type is not None: - t = a.type - if isinstance(t, TypeVar): - if t.__name__ in mapping: - t = mapping[t.__name__] - else: - handler = converter.unstructure - elif is_generic(t) and not is_bare(t) and not is_annotated(t): - t = deep_copy_with(t, mapping) - - if handler is None: - try: - handler = converter._unstructure_func.dispatch(t) - except RecursionError: - # There's a circular reference somewhere down the line - handler = converter.unstructure - else: - handler = converter.unstructure - - is_identity = handler == converter._unstructure_identity - - if not is_identity: - unstruct_handler_name = f"unstructure_{attr_name}" - globs[unstruct_handler_name] = handler - invoke = f"{unstruct_handler_name}(instance.{attr_name})" - else: - invoke = f"instance.{attr_name}" - - if d is not attr.NOTHING and ( - ( - _cattrs_omit_if_default - and override.omit_if_default is not False - ) - or override.omit_if_default - ): - def_name = f"__cattr_def_{attr_name}" - - if isinstance(d, attr.Factory): - globs[def_name] = d.factory - if d.takes_self: - post_lines.append( - f" if instance.{attr_name} != {def_name}(instance):" - ) - else: - post_lines.append( - f" if instance.{attr_name} != {def_name}():" - ) - post_lines.append(f" res['{kn}'] = {invoke}") - else: - globs[def_name] = d - post_lines.append( - f" if instance.{attr_name} != {def_name}:" - ) - post_lines.append(f" res['{kn}'] = {invoke}") - - else: - # No default or no override. - lines.append(f" '{kn}': {invoke},") - lines.append(" }") - - total_lines = lines + post_lines + [" return res"] - script = "\n".join(total_lines) - - fname = _generate_unique_filename( - cl, "unstructure", reserve=_cattrs_use_linecache - ) - - eval(compile(script, fname, "exec"), globs) - - fn = globs[fn_name] - if _cattrs_use_linecache: - linecache.cache[fname] = len(script), None, total_lines, fname - finally: - working_set.remove(cl) - - return fn - - -def _generate_mapping( - cl: Type, old_mapping: Dict[str, type] -) -> Dict[str, type]: - mapping = {} - for p, t in zip(get_origin(cl).__parameters__, get_args(cl)): - if isinstance(t, TypeVar): - continue - mapping[p.__name__] = t - - if not mapping: - return old_mapping - - return mapping - - -def make_dict_structure_fn( - cl: Type[T], - converter: "Converter", - _cattrs_forbid_extra_keys: bool = False, - _cattrs_use_linecache: bool = True, - _cattrs_prefer_attrib_converters: bool = False, - **kwargs, -) -> Callable[[Mapping[str, Any], Any], T]: - """Generate a specialized dict structuring function for an attrs class.""" - - mapping = {} - if is_generic(cl): - base = get_origin(cl) - mapping = _generate_mapping(cl, mapping) - cl = base - - for base in getattr(cl, "__orig_bases__", ()): - if is_generic(base) and not str(base).startswith("typing.Generic"): - mapping = _generate_mapping(base, mapping) - break - - if isinstance(cl, TypeVar): - cl = mapping.get(cl.__name__, cl) - - cl_name = cl.__name__ - fn_name = "structure_" + cl_name - - # We have generic parameters and need to generate a unique name for the function - for p in getattr(cl, "__parameters__", ()): - # This is nasty, I am not sure how best to handle `typing.List[str]` or `TClass[int, int]` as a parameter type here - name_base = mapping[p.__name__] - name = getattr(name_base, "__name__", None) or str(name_base) - name = re.sub(r"[\[\.\] ,]", "_", name) - fn_name += f"_{name}" - - globs = {"__c_s": converter.structure, "__cl": cl} - lines = [] - post_lines = [] - - attrs = adapted_fields(cl) - is_dc = is_dataclass(cl) - - if any(isinstance(a.type, str) for a in attrs): - # PEP 563 annotations - need to be resolved. - resolve_types(cl) - - lines.append(f"def {fn_name}(o, *_):") - lines.append(" res = {") - allowed_fields = set() - for a in attrs: - an = a.name - override = kwargs.pop(an, _neutral) - if override.omit: - continue - t = a.type - if isinstance(t, TypeVar): - t = mapping.get(t.__name__, t) - elif is_generic(t) and not is_bare(t) and not is_annotated(t): - t = deep_copy_with(t, mapping) - - # For each attribute, we try resolving the type here and now. - # If a type is manually overwritten, this function should be - # regenerated. - if a.converter is not None and _cattrs_prefer_attrib_converters: - handler = None - elif ( - a.converter is not None - and not _cattrs_prefer_attrib_converters - and t is not None - ): - handler = converter._structure_func.dispatch(t) - if handler == converter._structure_error: - handler = None - elif t is not None: - handler = converter._structure_func.dispatch(t) - else: - handler = converter.structure - - struct_handler_name = f"structure_{an}" - globs[struct_handler_name] = handler - - ian = an if (is_dc or an[0] != "_") else an[1:] - kn = an if override.rename is None else override.rename - allowed_fields.add(kn) - globs[f"type_{an}"] = t - if a.default is NOTHING: - if handler: - lines.append( - f" '{ian}': {struct_handler_name}(o['{kn}'], type_{an})," - ) - else: - lines.append(f" '{ian}': o['{kn}'],") - else: - post_lines.append(f" if '{kn}' in o:") - if handler: - post_lines.append( - f" res['{ian}'] = {struct_handler_name}(o['{kn}'], type_{an})" - ) - else: - post_lines.append(f" res['{ian}'] = o['{kn}']") - - lines.append(" }") - if _cattrs_forbid_extra_keys: - globs["__c_a"] = allowed_fields - post_lines += [ - " unknown_fields = set(o.keys()) - __c_a", - " if unknown_fields:", - " raise Exception(", - f" 'Extra fields in constructor for {cl_name}: ' + ', '.join(unknown_fields)" - " )", - ] - - total_lines = lines + post_lines + [" return __cl(**res)"] - - fname = _generate_unique_filename( - cl, "structure", reserve=_cattrs_use_linecache - ) - script = "\n".join(total_lines) - eval(compile(script, fname, "exec"), globs) - if _cattrs_use_linecache: - linecache.cache[fname] = len(script), None, total_lines, fname - - return globs[fn_name] - - -def make_iterable_unstructure_fn(cl: Any, converter, unstructure_to=None): - """Generate a specialized unstructure function for an iterable.""" - handler = converter.unstructure - - fn_name = "unstructure_iterable" - - # Let's try fishing out the type args. - if getattr(cl, "__args__", None) is not None: - type_arg = cl.__args__[0] - # We don't know how to handle the TypeVar on this level, - # so we skip doing the dispatch here. - if not isinstance(type_arg, TypeVar): - handler = converter._unstructure_func.dispatch(type_arg) - - globs = {"__cattr_seq_cl": unstructure_to or cl, "__cattr_u": handler} - lines = [] - - lines.append(f"def {fn_name}(iterable):") - lines.append(" res = __cattr_seq_cl(__cattr_u(i) for i in iterable)") - - total_lines = lines + [" return res"] - - eval(compile("\n".join(total_lines), "", "exec"), globs) - - fn = globs[fn_name] - - return fn - - -def make_hetero_tuple_unstructure_fn(cl: Any, converter, unstructure_to=None): - """Generate a specialized unstructure function for a heterogenous tuple.""" - fn_name = "unstructure_tuple" - - type_args = get_args(cl) - - # We can do the dispatch here and now. - handlers = [ - converter._unstructure_func.dispatch(type_arg) - for type_arg in type_args - ] - - globs = {f"__cattr_u_{i}": h for i, h in enumerate(handlers)} - if unstructure_to is not tuple: - globs["__cattr_seq_cl"] = unstructure_to or cl - lines = [] - - lines.append(f"def {fn_name}(tup):") - if unstructure_to is not tuple: - lines.append(" res = __cattr_seq_cl((") - else: - lines.append(" res = (") - for i in range(len(handlers)): - if handlers[i] == converter._unstructure_identity: - lines.append(f" tup[{i}],") - else: - lines.append(f" __cattr_u_{i}(tup[{i}]),") - - if unstructure_to is not tuple: - lines.append(" ))") - else: - lines.append(" )") - - total_lines = lines + [" return res"] - - eval(compile("\n".join(total_lines), "", "exec"), globs) - - fn = globs[fn_name] - - return fn - - -def make_mapping_unstructure_fn( - cl: Any, converter, unstructure_to=None, key_handler=None -): - """Generate a specialized unstructure function for a mapping.""" - kh = key_handler or converter.unstructure - val_handler = converter.unstructure - - fn_name = "unstructure_mapping" - - # Let's try fishing out the type args. - if getattr(cl, "__args__", None) is not None: - args = get_args(cl) - if len(args) == 2: - key_arg, val_arg = args - else: - # Probably a Counter - key_arg, val_arg = args, Any - # We can do the dispatch here and now. - kh = key_handler or converter._unstructure_func.dispatch(key_arg) - if kh == converter._unstructure_identity: - kh = None - - val_handler = converter._unstructure_func.dispatch(val_arg) - if val_handler == converter._unstructure_identity: - val_handler = None - - globs = { - "__cattr_mapping_cl": unstructure_to or cl, - "__cattr_k_u": kh, - "__cattr_v_u": val_handler, - } - - k_u = "__cattr_k_u(k)" if kh is not None else "k" - v_u = "__cattr_v_u(v)" if val_handler is not None else "v" - - lines = [] - - lines.append(f"def {fn_name}(mapping):") - lines.append( - f" res = __cattr_mapping_cl(({k_u}, {v_u}) for k, v in mapping.items())" - ) - - total_lines = lines + [" return res"] - - eval(compile("\n".join(total_lines), "", "exec"), globs) - - fn = globs[fn_name] - - return fn - - -def make_mapping_structure_fn( - cl: Any, converter, structure_to=dict, key_type=NOTHING, val_type=NOTHING -): - """Generate a specialized unstructure function for a mapping.""" - fn_name = "structure_mapping" - - globs = {"__cattr_mapping_cl": structure_to} - - lines = [] - lines.append(f"def {fn_name}(mapping, _):") - - # Let's try fishing out the type args. - if not is_bare(cl): - args = get_args(cl) - if len(args) == 2: - key_arg_cand, val_arg_cand = args - if key_type is NOTHING: - key_type = key_arg_cand - if val_type is NOTHING: - val_type = val_arg_cand - else: - if key_type is not NOTHING and val_type is NOTHING: - (val_type,) = args - elif key_type is NOTHING and val_type is not NOTHING: - (key_type,) = args - else: - # Probably a Counter - (key_type,) = args - val_type = Any - - is_bare_dict = val_type is Any and key_type is Any - if not is_bare_dict: - # We can do the dispatch here and now. - key_handler = converter._structure_func.dispatch(key_type) - if key_handler == converter._structure_call: - key_handler = key_type - - val_handler = converter._structure_func.dispatch(val_type) - if val_handler == converter._structure_call: - val_handler = val_type - - globs["__cattr_k_t"] = key_type - globs["__cattr_v_t"] = val_type - globs["__cattr_k_s"] = key_handler - globs["__cattr_v_s"] = val_handler - k_s = ( - "__cattr_k_s(k, __cattr_k_t)" - if key_handler != key_type - else "__cattr_k_s(k)" - ) - v_s = ( - "__cattr_v_s(v, __cattr_v_t)" - if val_handler != val_type - else "__cattr_v_s(v)" - ) - else: - is_bare_dict = True - - if is_bare_dict: - # No args, it's a bare dict. - lines.append(" res = dict(mapping)") - else: - lines.append(f" res = {{{k_s}: {v_s} for k, v in mapping.items()}}") - if structure_to is not dict: - lines.append(" res = __cattr_mapping_cl(res)") - - total_lines = lines + [" return res"] - - eval(compile("\n".join(total_lines), "", "exec"), globs) - - fn = globs[fn_name] - - return fn - - -def _generate_unique_filename(cls, func_name, reserve=True): - """ - Create a "filename" suitable for a function being generated. - """ - unique_id = uuid.uuid4() - extra = "" - count = 1 - - while True: - unique_filename = "".format( - func_name, - cls.__module__, - getattr(cls, "__qualname__", cls.__name__), - extra, - ) - if not reserve: - return unique_filename - # To handle concurrency we essentially "reserve" our spot in - # the linecache with a dummy line. The caller can then - # set this value correctly. - cache_line = (1, None, (str(unique_id),), unique_filename) - if ( - linecache.cache.setdefault(unique_filename, cache_line) - == cache_line - ): - return unique_filename - - # Looks like this spot is taken. Try again. - count += 1 - extra = "-{0}".format(count) +__all__ = [ + "AttributeOverride", + "make_dict_structure_fn", + "make_dict_unstructure_fn", + "make_hetero_tuple_unstructure_fn", + "make_iterable_unstructure_fn", + "make_mapping_structure_fn", + "make_mapping_unstructure_fn", + "override", +] diff --git a/src/cattrs/gen.py b/src/cattrs/gen.py index e5f53751..aedc4abf 100644 --- a/src/cattrs/gen.py +++ b/src/cattrs/gen.py @@ -1,21 +1,635 @@ -from cattr.gen import ( - AttributeOverride, - make_dict_structure_fn, - make_dict_unstructure_fn, - make_hetero_tuple_unstructure_fn, - make_iterable_unstructure_fn, - make_mapping_structure_fn, - make_mapping_unstructure_fn, - override, +import linecache +import re +import uuid +from dataclasses import is_dataclass +from threading import local +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Mapping, + Optional, + Type, + TypeVar, ) -__all__ = [ - "AttributeOverride", - "make_dict_structure_fn", - "make_dict_unstructure_fn", - "make_hetero_tuple_unstructure_fn", - "make_iterable_unstructure_fn", - "make_mapping_structure_fn", - "make_mapping_unstructure_fn", - "override", -] +import attr +from attr import NOTHING, frozen, resolve_types + +from cattr._compat import ( + adapted_fields, + get_args, + get_origin, + is_annotated, + is_bare, + is_generic, +) +from cattr._generics import deep_copy_with + +if TYPE_CHECKING: # pragma: no cover + from cattr.converters import Converter + + +@frozen +class AttributeOverride: + omit_if_default: Optional[bool] = None + rename: Optional[str] = None + omit: bool = False # Omit the field completely. + + +def override(omit_if_default=None, rename=None, omit: bool = False): + return AttributeOverride( + omit_if_default=omit_if_default, rename=rename, omit=omit + ) + + +_neutral = AttributeOverride() +_already_generating = local() +T = TypeVar("T") + + +def make_dict_unstructure_fn( + cl, + converter, + _cattrs_omit_if_default: bool = False, + _cattrs_use_linecache: bool = True, + **kwargs, +) -> Callable[[Any], Dict]: + """ + Generate a specialized dict unstructuring function for an attrs class or a + dataclass. + """ + origin = get_origin(cl) + attrs = adapted_fields(origin or cl) # type: ignore + + if any(isinstance(a.type, str) for a in attrs): + # PEP 563 annotations - need to be resolved. + resolve_types(cl) + + mapping = {} + if is_generic(cl): + mapping = _generate_mapping(cl, mapping) + + for base in getattr(origin, "__orig_bases__", ()): + if is_generic(base) and not str(base).startswith("typing.Generic"): + mapping = _generate_mapping(base, mapping) + break + cl = origin + + cl_name = cl.__name__ + fn_name = "unstructure_" + cl_name + globs = {} + lines = [] + invocation_lines = [] + internal_arg_parts = {} + + # We keep track of what we're generating to help with recursive + # class graphs. + try: + working_set = _already_generating.working_set + except AttributeError: + working_set = set() + _already_generating.working_set = working_set + if cl in working_set: + raise RecursionError() + else: + working_set.add(cl) + + try: + for a in attrs: + attr_name = a.name + override = kwargs.pop(attr_name, _neutral) + if override.omit: + continue + kn = attr_name if override.rename is None else override.rename + d = a.default + + # For each attribute, we try resolving the type here and now. + # If a type is manually overwritten, this function should be + # regenerated. + handler = None + if a.type is not None: + t = a.type + if isinstance(t, TypeVar): + if t.__name__ in mapping: + t = mapping[t.__name__] + else: + handler = converter.unstructure + elif is_generic(t) and not is_bare(t) and not is_annotated(t): + t = deep_copy_with(t, mapping) + + if handler is None: + try: + handler = converter._unstructure_func.dispatch(t) + except RecursionError: + # There's a circular reference somewhere down the line + handler = converter.unstructure + else: + handler = converter.unstructure + + is_identity = handler == converter._unstructure_identity + + if not is_identity: + unstruct_handler_name = f"__c_unstr_{attr_name}" + globs[unstruct_handler_name] = handler + internal_arg_parts[unstruct_handler_name] = handler + invoke = f"{unstruct_handler_name}(instance.{attr_name})" + else: + invoke = f"instance.{attr_name}" + + if d is not attr.NOTHING and ( + ( + _cattrs_omit_if_default + and override.omit_if_default is not False + ) + or override.omit_if_default + ): + def_name = f"__c_def_{attr_name}" + + if isinstance(d, attr.Factory): + globs[def_name] = d.factory + internal_arg_parts[def_name] = d.factory + if d.takes_self: + lines.append( + f" if instance.{attr_name} != {def_name}(instance):" + ) + else: + lines.append( + f" if instance.{attr_name} != {def_name}():" + ) + lines.append(f" res['{kn}'] = {invoke}") + else: + globs[def_name] = d + internal_arg_parts[def_name] = d + lines.append(f" if instance.{attr_name} != {def_name}:") + lines.append(f" res['{kn}'] = {invoke}") + + else: + # No default or no override. + invocation_lines.append(f"'{kn}': {invoke},") + + internal_arg_line = ", ".join([f"{i}={i}" for i in internal_arg_parts]) + if internal_arg_line: + internal_arg_line = f", {internal_arg_line}" + for k, v in internal_arg_parts.items(): + globs[k] = v + + total_lines = ( + [f"def {fn_name}(instance{internal_arg_line}):"] + + [" res = {"] + + [f" {line}" for line in invocation_lines] + + [" }"] + + lines + + [" return res"] + ) + script = "\n".join(total_lines) + + fname = _generate_unique_filename( + cl, "unstructure", reserve=_cattrs_use_linecache + ) + + eval(compile(script, fname, "exec"), globs) + + fn = globs[fn_name] + if _cattrs_use_linecache: + linecache.cache[fname] = len(script), None, total_lines, fname + finally: + working_set.remove(cl) + + return fn + + +def _generate_mapping( + cl: Type, old_mapping: Dict[str, type] +) -> Dict[str, type]: + mapping = {} + for p, t in zip(get_origin(cl).__parameters__, get_args(cl)): + if isinstance(t, TypeVar): + continue + mapping[p.__name__] = t + + if not mapping: + return old_mapping + + return mapping + + +def make_dict_structure_fn( + cl: Type[T], + converter: "Converter", + _cattrs_forbid_extra_keys: bool = False, + _cattrs_use_linecache: bool = True, + _cattrs_prefer_attrib_converters: bool = False, + **kwargs, +) -> Callable[[Mapping[str, Any], Any], T]: + """Generate a specialized dict structuring function for an attrs class.""" + + mapping = {} + if is_generic(cl): + base = get_origin(cl) + mapping = _generate_mapping(cl, mapping) + cl = base + + for base in getattr(cl, "__orig_bases__", ()): + if is_generic(base) and not str(base).startswith("typing.Generic"): + mapping = _generate_mapping(base, mapping) + break + + if isinstance(cl, TypeVar): + cl = mapping.get(cl.__name__, cl) + + cl_name = cl.__name__ + fn_name = "structure_" + cl_name + + # We have generic parameters and need to generate a unique name for the function + for p in getattr(cl, "__parameters__", ()): + # This is nasty, I am not sure how best to handle `typing.List[str]` or `TClass[int, int]` as a parameter type here + name_base = mapping[p.__name__] + name = getattr(name_base, "__name__", None) or str(name_base) + name = re.sub(r"[\[\.\] ,]", "_", name) + fn_name += f"_{name}" + + internal_arg_parts = {"__cl": cl} + globs = {} + lines = [] + post_lines = [] + invocation_lines = [] + + attrs = adapted_fields(cl) + is_dc = is_dataclass(cl) + + if any(isinstance(a.type, str) for a in attrs): + # PEP 563 annotations - need to be resolved. + resolve_types(cl) + + allowed_fields = set() + non_required = [] + # The first loop deals with required args. + for a in attrs: + an = a.name + override = kwargs.get(an, _neutral) + if override.omit: + continue + if a.default is not NOTHING: + non_required.append(a) + continue + t = a.type + if isinstance(t, TypeVar): + t = mapping.get(t.__name__, t) + elif is_generic(t) and not is_bare(t) and not is_annotated(t): + t = deep_copy_with(t, mapping) + + # For each attribute, we try resolving the type here and now. + # If a type is manually overwritten, this function should be + # regenerated. + if a.converter is not None and _cattrs_prefer_attrib_converters: + handler = None + elif ( + a.converter is not None + and not _cattrs_prefer_attrib_converters + and t is not None + ): + handler = converter._structure_func.dispatch(t) + if handler == converter._structure_error: + handler = None + elif t is not None: + handler = converter._structure_func.dispatch(t) + else: + handler = converter.structure + + struct_handler_name = f"__c_structure_{an}" + internal_arg_parts[struct_handler_name] = handler + + kn = an if override.rename is None else override.rename + allowed_fields.add(kn) + + if handler: + if handler == converter._structure_call: + internal_arg_parts[struct_handler_name] = t + invocation_lines.append(f"{struct_handler_name}(o['{kn}']),") + else: + type_name = f"__c_type_{an}" + internal_arg_parts[type_name] = t + invocation_lines.append( + f"{struct_handler_name}(o['{kn}'], {type_name})," + ) + else: + invocation_lines.append(f"o['{kn}'],") + + # The second loop is for optional args. + if non_required: + invocation_lines.append("**res,") + lines.append(" res = {}") + + for a in non_required: + an = a.name + override = kwargs.get(an, _neutral) + t = a.type + if isinstance(t, TypeVar): + t = mapping.get(t.__name__, t) + elif is_generic(t) and not is_bare(t) and not is_annotated(t): + t = deep_copy_with(t, mapping) + + # For each attribute, we try resolving the type here and now. + # If a type is manually overwritten, this function should be + # regenerated. + if a.converter is not None and _cattrs_prefer_attrib_converters: + handler = None + elif ( + a.converter is not None + and not _cattrs_prefer_attrib_converters + and t is not None + ): + handler = converter._structure_func.dispatch(t) + if handler == converter._structure_error: + handler = None + elif t is not None: + handler = converter._structure_func.dispatch(t) + else: + handler = converter.structure + + struct_handler_name = f"__c_structure_{an}" + internal_arg_parts[struct_handler_name] = handler + + ian = an if (is_dc or an[0] != "_") else an[1:] + kn = an if override.rename is None else override.rename + allowed_fields.add(kn) + post_lines.append(f" if '{kn}' in o:") + if handler: + if handler == converter._structure_call: + internal_arg_parts[struct_handler_name] = t + post_lines.append( + f" res['{ian}'] = {struct_handler_name}(o['{kn}'])" + ) + else: + type_name = f"__c_type_{an}" + internal_arg_parts[type_name] = t + post_lines.append( + f" res['{ian}'] = {struct_handler_name}(o['{kn}'], {type_name})" + ) + else: + post_lines.append(f" res['{ian}'] = o['{kn}']") + + if _cattrs_forbid_extra_keys: + globs["__c_a"] = allowed_fields + post_lines += [ + " unknown_fields = set(o.keys()) - __c_a", + " if unknown_fields:", + " raise Exception(", + f" 'Extra fields in constructor for {cl_name}: ' + ', '.join(unknown_fields)" + " )", + ] + + # At the end, we create the function header. + internal_arg_line = ", ".join([f"{i}={i}" for i in internal_arg_parts]) + for k, v in internal_arg_parts.items(): + globs[k] = v + + total_lines = ( + [f"def {fn_name}(o, _, *, {internal_arg_line}):"] + + lines + + post_lines + + [" return __cl("] + + [f" {line}" for line in invocation_lines] + + [" )"] + ) + + fname = _generate_unique_filename( + cl, "structure", reserve=_cattrs_use_linecache + ) + script = "\n".join(total_lines) + eval(compile(script, fname, "exec"), globs) + if _cattrs_use_linecache: + linecache.cache[fname] = len(script), None, total_lines, fname + + return globs[fn_name] + + +def make_iterable_unstructure_fn(cl: Any, converter, unstructure_to=None): + """Generate a specialized unstructure function for an iterable.""" + handler = converter.unstructure + + fn_name = "unstructure_iterable" + + # Let's try fishing out the type args. + if getattr(cl, "__args__", None) is not None: + type_arg = cl.__args__[0] + # We don't know how to handle the TypeVar on this level, + # so we skip doing the dispatch here. + if not isinstance(type_arg, TypeVar): + handler = converter._unstructure_func.dispatch(type_arg) + + globs = {"__cattr_seq_cl": unstructure_to or cl, "__cattr_u": handler} + lines = [] + + lines.append(f"def {fn_name}(iterable):") + lines.append(" res = __cattr_seq_cl(__cattr_u(i) for i in iterable)") + + total_lines = lines + [" return res"] + + eval(compile("\n".join(total_lines), "", "exec"), globs) + + fn = globs[fn_name] + + return fn + + +def make_hetero_tuple_unstructure_fn(cl: Any, converter, unstructure_to=None): + """Generate a specialized unstructure function for a heterogenous tuple.""" + fn_name = "unstructure_tuple" + + type_args = get_args(cl) + + # We can do the dispatch here and now. + handlers = [ + converter._unstructure_func.dispatch(type_arg) + for type_arg in type_args + ] + + globs = {f"__cattr_u_{i}": h for i, h in enumerate(handlers)} + if unstructure_to is not tuple: + globs["__cattr_seq_cl"] = unstructure_to or cl + lines = [] + + lines.append(f"def {fn_name}(tup):") + if unstructure_to is not tuple: + lines.append(" res = __cattr_seq_cl((") + else: + lines.append(" res = (") + for i in range(len(handlers)): + if handlers[i] == converter._unstructure_identity: + lines.append(f" tup[{i}],") + else: + lines.append(f" __cattr_u_{i}(tup[{i}]),") + + if unstructure_to is not tuple: + lines.append(" ))") + else: + lines.append(" )") + + total_lines = lines + [" return res"] + + eval(compile("\n".join(total_lines), "", "exec"), globs) + + fn = globs[fn_name] + + return fn + + +def make_mapping_unstructure_fn( + cl: Any, converter, unstructure_to=None, key_handler=None +): + """Generate a specialized unstructure function for a mapping.""" + kh = key_handler or converter.unstructure + val_handler = converter.unstructure + + fn_name = "unstructure_mapping" + + # Let's try fishing out the type args. + if getattr(cl, "__args__", None) is not None: + args = get_args(cl) + if len(args) == 2: + key_arg, val_arg = args + else: + # Probably a Counter + key_arg, val_arg = args, Any + # We can do the dispatch here and now. + kh = key_handler or converter._unstructure_func.dispatch(key_arg) + if kh == converter._unstructure_identity: + kh = None + + val_handler = converter._unstructure_func.dispatch(val_arg) + if val_handler == converter._unstructure_identity: + val_handler = None + + globs = { + "__cattr_mapping_cl": unstructure_to or cl, + "__cattr_k_u": kh, + "__cattr_v_u": val_handler, + } + + k_u = "__cattr_k_u(k)" if kh is not None else "k" + v_u = "__cattr_v_u(v)" if val_handler is not None else "v" + + lines = [] + + lines.append(f"def {fn_name}(mapping):") + lines.append( + f" res = __cattr_mapping_cl(({k_u}, {v_u}) for k, v in mapping.items())" + ) + + total_lines = lines + [" return res"] + + eval(compile("\n".join(total_lines), "", "exec"), globs) + + fn = globs[fn_name] + + return fn + + +def make_mapping_structure_fn( + cl: Any, converter, structure_to=dict, key_type=NOTHING, val_type=NOTHING +): + """Generate a specialized unstructure function for a mapping.""" + fn_name = "structure_mapping" + + globs = {"__cattr_mapping_cl": structure_to} + + lines = [] + lines.append(f"def {fn_name}(mapping, _):") + + # Let's try fishing out the type args. + if not is_bare(cl): + args = get_args(cl) + if len(args) == 2: + key_arg_cand, val_arg_cand = args + if key_type is NOTHING: + key_type = key_arg_cand + if val_type is NOTHING: + val_type = val_arg_cand + else: + if key_type is not NOTHING and val_type is NOTHING: + (val_type,) = args + elif key_type is NOTHING and val_type is not NOTHING: + (key_type,) = args + else: + # Probably a Counter + (key_type,) = args + val_type = Any + + is_bare_dict = val_type is Any and key_type is Any + if not is_bare_dict: + # We can do the dispatch here and now. + key_handler = converter._structure_func.dispatch(key_type) + if key_handler == converter._structure_call: + key_handler = key_type + + val_handler = converter._structure_func.dispatch(val_type) + if val_handler == converter._structure_call: + val_handler = val_type + + globs["__cattr_k_t"] = key_type + globs["__cattr_v_t"] = val_type + globs["__cattr_k_s"] = key_handler + globs["__cattr_v_s"] = val_handler + k_s = ( + "__cattr_k_s(k, __cattr_k_t)" + if key_handler != key_type + else "__cattr_k_s(k)" + ) + v_s = ( + "__cattr_v_s(v, __cattr_v_t)" + if val_handler != val_type + else "__cattr_v_s(v)" + ) + else: + is_bare_dict = True + + if is_bare_dict: + # No args, it's a bare dict. + lines.append(" res = dict(mapping)") + else: + lines.append(f" res = {{{k_s}: {v_s} for k, v in mapping.items()}}") + if structure_to is not dict: + lines.append(" res = __cattr_mapping_cl(res)") + + total_lines = lines + [" return res"] + + eval(compile("\n".join(total_lines), "", "exec"), globs) + + fn = globs[fn_name] + + return fn + + +def _generate_unique_filename(cls, func_name, reserve=True): + """ + Create a "filename" suitable for a function being generated. + """ + unique_id = uuid.uuid4() + extra = "" + count = 1 + + while True: + unique_filename = "".format( + func_name, + cls.__module__, + getattr(cls, "__qualname__", cls.__name__), + extra, + ) + if not reserve: + return unique_filename + # To handle concurrency we essentially "reserve" our spot in + # the linecache with a dummy line. The caller can then + # set this value correctly. + cache_line = (1, None, (str(unique_id),), unique_filename) + if ( + linecache.cache.setdefault(unique_filename, cache_line) + == cache_line + ): + return unique_filename + + # Looks like this spot is taken. Try again. + count += 1 + extra = "-{0}".format(count)