|
2 | 2 |
|
3 | 3 | from __future__ import annotations
|
4 | 4 |
|
| 5 | +from io import BytesIO |
5 | 6 | import re
|
6 | 7 | import typing
|
7 | 8 | from collections import OrderedDict, UserList, defaultdict
|
|
21 | 22 | Iterable,
|
22 | 23 | List,
|
23 | 24 | Optional,
|
| 25 | + Sequence, |
24 | 26 | Set,
|
25 | 27 | Type,
|
26 | 28 | TypeVar,
|
|
41 | 43 | pass
|
42 | 44 |
|
43 | 45 | from cbor2 import (
|
| 46 | + CBORDecoder, |
44 | 47 | CBOREncoder,
|
45 | 48 | CBORSimpleValue,
|
46 | 49 | CBORTag,
|
@@ -199,6 +202,17 @@ def wrapper(cls, value: Primitive):
|
199 | 202 | CBORBase = TypeVar("CBORBase", bound="CBORSerializable")
|
200 | 203 |
|
201 | 204 |
|
| 205 | +class IndefiniteDecoder(CBORDecoder): |
| 206 | + def decode_array(self, subtype: int) -> Sequence[Any]: |
| 207 | + # Major tag 4 |
| 208 | + length = self._decode_length(subtype, allow_indefinite=True) |
| 209 | + |
| 210 | + if length is None: |
| 211 | + return IndefiniteList(super().decode_array(subtype=subtype)) |
| 212 | + else: |
| 213 | + return super().decode_array(subtype=subtype) |
| 214 | + |
| 215 | + |
202 | 216 | def default_encoder(
|
203 | 217 | encoder: CBOREncoder, value: Union[CBORSerializable, IndefiniteList]
|
204 | 218 | ):
|
@@ -265,7 +279,7 @@ class CBORSerializable:
|
265 | 279 | does not refer to itself, which could cause infinite loops.
|
266 | 280 | """
|
267 | 281 |
|
268 |
| - def to_shallow_primitive(self) -> Primitive: |
| 282 | + def to_shallow_primitive(self) -> Union[Primitive, CBORSerializable]: |
269 | 283 | """
|
270 | 284 | Convert the instance to a CBOR primitive. If the primitive is a container, e.g. list, dict, the type of
|
271 | 285 | its elements could be either a Primitive or a CBORSerializable.
|
@@ -516,7 +530,10 @@ def from_cbor(cls, payload: Union[str, bytes]) -> CBORSerializable:
|
516 | 530 | """
|
517 | 531 | if type(payload) is str:
|
518 | 532 | payload = bytes.fromhex(payload)
|
519 |
| - value = loads(payload) # type: ignore |
| 533 | + |
| 534 | + with BytesIO(payload) as fp: |
| 535 | + value = IndefiniteDecoder(fp).decode() |
| 536 | + |
520 | 537 | return cls.from_primitive(value)
|
521 | 538 |
|
522 | 539 | def __repr__(self):
|
@@ -580,10 +597,14 @@ def _restore_typed_primitive(
|
580 | 597 | raise DeserializeException(
|
581 | 598 | f"List types need exactly one type argument, but got {t_args}"
|
582 | 599 | )
|
583 |
| - t = t_args[0] |
584 |
| - if not isinstance(v, list): |
| 600 | + t_subtype = t_args[0] |
| 601 | + if not isinstance(v, (list, IndefiniteList)): |
585 | 602 | raise DeserializeException(f"Expected type list but got {type(v)}")
|
586 |
| - return IndefiniteList([_restore_typed_primitive(t, w) for w in v]) |
| 603 | + v_list = [_restore_typed_primitive(t_subtype, w) for w in v] |
| 604 | + if t == IndefiniteList: |
| 605 | + return IndefiniteList(v_list) |
| 606 | + else: |
| 607 | + return v_list |
587 | 608 | elif isclass(t) and t == ByteString:
|
588 | 609 | if not isinstance(v, bytes):
|
589 | 610 | raise DeserializeException(f"Expected type bytes but got {type(v)}")
|
|
0 commit comments