From b44155b73d5cf3b00e52a61ece1030128f72c447 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Sat, 22 Jul 2023 21:26:43 -0700 Subject: [PATCH 1/5] check nbf/exp and add allowed_max_lifetime --- src/cryptojwt/jwt.py | 26 +++++++++++++++++++- tests/test_09_jwt.py | 56 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index f6c738e8..e6ad8f9e 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -1,6 +1,7 @@ """Basic JSON Web Token implementation.""" import json import logging +import time import uuid from datetime import datetime from datetime import timezone @@ -95,6 +96,7 @@ def __init__( allowed_sign_algs=None, allowed_enc_algs=None, allowed_enc_encs=None, + allowed_max_lifetime=None, zip="", ): self.key_jar = key_jar # KeyJar instance @@ -115,6 +117,7 @@ def __init__( self.allowed_sign_algs = allowed_sign_algs self.allowed_enc_algs = allowed_enc_algs self.allowed_enc_encs = allowed_enc_encs + self.allowed_max_lifetime = allowed_max_lifetime self.zip = zip def receiver_keys(self, recv, use): @@ -304,11 +307,12 @@ def verify_profile(msg_cls, info, **kwargs): raise VerificationError() return _msg - def unpack(self, token): + def unpack(self, token, timestamp=None): """ Unpack a received signed or signed and encrypted Json Web Token :param token: The Json Web Token + :param t: Time for evaluation (default now) :return: If decryption and signature verification work the payload will be returned as a Message instance if possible. """ @@ -378,6 +382,26 @@ def unpack(self, token): except KeyError: _msg_cls = None + timestamp = timestamp or time.time() + + if "nbf" in _info: + nbf = int(_info["nbf"]) + if timestamp < nbf - self.skew: + raise VerificationError("Token not yet valid") + + if "exp" in _info: + exp = int(_info["exp"]) + if timestamp >= exp + self.skew: + raise VerificationError("Token expired") + else: + exp = None + + if "iat" in _info: + iat = int(_info["iat"]) + if self.allowed_max_lifetime and exp: + if abs(exp - iat) > self.allowed_max_lifetime: + raise VerificationError("Token lifetime exceeded") + if _msg_cls: vp_args = {"skew": self.skew} if self.iss: diff --git a/tests/test_09_jwt.py b/tests/test_09_jwt.py index 2f645fe7..140d635d 100755 --- a/tests/test_09_jwt.py +++ b/tests/test_09_jwt.py @@ -5,6 +5,7 @@ from cryptojwt.exception import IssuerNotFound from cryptojwt.jws.exception import NoSuitableSigningKeys from cryptojwt.jwt import JWT +from cryptojwt.jwt import VerificationError, utc_time_sans_frac from cryptojwt.jwt import pick_key from cryptojwt.key_bundle import KeyBundle from cryptojwt.key_jar import KeyJar @@ -81,6 +82,59 @@ def test_jwt_pack_and_unpack(): assert set(info.keys()) == {"iat", "iss", "sub"} +def test_jwt_pack_and_unpack_valid(): + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256") + t = utc_time_sans_frac() + payload = {"sub": "sub", "nbf": t, "exp": t + 3600} + _jwt = alice.pack(payload=payload) + + bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"]) + info = bob.unpack(_jwt) + + assert set(info.keys()) == {"iat", "iss", "sub", "nbf", "exp"} + + +def test_jwt_pack_and_unpack_not_yet_valid(): + lifetime = 3600 + skew = 15 + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime) + timestamp = utc_time_sans_frac() + payload = {"sub": "sub", "nbf": timestamp} + _jwt = alice.pack(payload=payload) + + bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], skew=skew) + _ = bob.unpack(_jwt, timestamp=timestamp - skew) + with pytest.raises(VerificationError): + _ = bob.unpack(_jwt, timestamp=timestamp - skew - 1) + + +def test_jwt_pack_and_unpack_expired(): + lifetime = 3600 + skew = 15 + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime) + payload = {"sub": "sub"} + _jwt = alice.pack(payload=payload) + + bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], skew=skew) + iat = bob.unpack(_jwt)["iat"] + _ = bob.unpack(_jwt, timestamp=iat + lifetime + skew - 1) + with pytest.raises(VerificationError): + _ = bob.unpack(_jwt, timestamp=iat + lifetime + skew) + + +def test_jwt_pack_and_unpack_max_lifetime_exceeded(): + lifetime = 3600 + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime) + payload = {"sub": "sub"} + _jwt = alice.pack(payload=payload) + + bob = JWT( + key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], allowed_max_lifetime=lifetime - 1 + ) + with pytest.raises(VerificationError): + _ = bob.unpack(_jwt) + + def test_jwt_pack_and_unpack_unknown_issuer(): alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256") payload = {"sub": "sub"} @@ -261,4 +315,4 @@ def test_eddsa_jwt(): kj = KeyJar() kj.add_kb(ISSUER, KeyBundle(JWKS_DICT)) jwt = JWT(key_jar=kj) - _ = jwt.unpack(JWT_TEST) + _ = jwt.unpack(JWT_TEST, timestamp=1655278809) From b9a0c0fa57ce4d16b38b2e15d08de5e05f6c6736 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Sat, 22 Jul 2023 21:33:13 -0700 Subject: [PATCH 2/5] allow iat override for JWT pack --- src/cryptojwt/jwt.py | 9 +++++---- tests/test_09_jwt.py | 26 ++++++++++++++++++++------ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index e6ad8f9e..a62af67b 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -179,13 +179,13 @@ def put_together_aud(recv, aud=None): return _aud - def pack_init(self, recv, aud): + def pack_init(self, recv, aud, iat=None): """ Gather initial information for the payload. :return: A dictionary with claims and values """ - argv = {"iss": self.iss, "iat": utc_time_sans_frac()} + argv = {"iss": self.iss, "iat": iat or utc_time_sans_frac()} if self.lifetime: argv["exp"] = argv["iat"] + self.lifetime @@ -210,7 +210,7 @@ def pack_key(self, issuer_id="", kid=""): return keys[0] # Might be more then one if kid == '' - def pack(self, payload=None, kid="", issuer_id="", recv="", aud=None, **kwargs): + def pack(self, payload=None, kid="", issuer_id="", recv="", aud=None, iat=None, **kwargs): """ :param payload: Information to be carried as payload in the JWT @@ -219,13 +219,14 @@ def pack(self, payload=None, kid="", issuer_id="", recv="", aud=None, **kwargs): :param recv: The intended immediate receiver :param aud: Intended audience for this JWS/JWE, not expected to contain the recipient. + :param iat: Override issued at (default current timestamp) :param kwargs: Extra keyword arguments :return: A signed or signed and encrypted Json Web Token """ _args = {} if payload is not None: _args.update(payload) - _args.update(self.pack_init(recv, aud)) + _args.update(self.pack_init(recv, aud, iat)) try: _encrypt = kwargs["encrypt"] diff --git a/tests/test_09_jwt.py b/tests/test_09_jwt.py index 140d635d..94c7b106 100755 --- a/tests/test_09_jwt.py +++ b/tests/test_09_jwt.py @@ -135,15 +135,29 @@ def test_jwt_pack_and_unpack_max_lifetime_exceeded(): _ = bob.unpack(_jwt) -def test_jwt_pack_and_unpack_unknown_issuer(): - alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256") +def test_jwt_pack_and_unpack_max_lifetime_exceeded(): + lifetime = 3600 + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime) payload = {"sub": "sub"} _jwt = alice.pack(payload=payload) - kj = KeyJar() - bob = JWT(key_jar=kj, iss=BOB, allowed_sign_algs=["RS256"]) - with pytest.raises(IssuerNotFound): - info = bob.unpack(_jwt) + bob = JWT( + key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"], allowed_max_lifetime=lifetime - 1 + ) + with pytest.raises(VerificationError): + _ = bob.unpack(_jwt) + + +def test_jwt_pack_and_unpack_timestamp(): + lifetime = 3600 + alice = JWT(key_jar=ALICE_KEY_JAR, iss=ALICE, sign_alg="RS256", lifetime=lifetime) + payload = {"sub": "sub"} + _jwt = alice.pack(payload=payload, iat=42) + + bob = JWT(key_jar=BOB_KEY_JAR, iss=BOB, allowed_sign_algs=["RS256"]) + _ = bob.unpack(_jwt, timestamp=42) + with pytest.raises(VerificationError): + _ = bob.unpack(_jwt) def test_jwt_pack_and_unpack_unknown_key(): From 428689aab5613aa685f5feac6bb122b0ac45c1f1 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Sat, 22 Jul 2023 21:45:37 -0700 Subject: [PATCH 3/5] use utc_time_sans_frac --- src/cryptojwt/jwt.py | 3 +-- tests/test_09_jwt.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index a62af67b..79d1e17b 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -1,7 +1,6 @@ """Basic JSON Web Token implementation.""" import json import logging -import time import uuid from datetime import datetime from datetime import timezone @@ -383,7 +382,7 @@ def unpack(self, token, timestamp=None): except KeyError: _msg_cls = None - timestamp = timestamp or time.time() + timestamp = timestamp or utc_time_sans_frac() if "nbf" in _info: nbf = int(_info["nbf"]) diff --git a/tests/test_09_jwt.py b/tests/test_09_jwt.py index 94c7b106..0bb912fd 100755 --- a/tests/test_09_jwt.py +++ b/tests/test_09_jwt.py @@ -5,8 +5,9 @@ from cryptojwt.exception import IssuerNotFound from cryptojwt.jws.exception import NoSuitableSigningKeys from cryptojwt.jwt import JWT -from cryptojwt.jwt import VerificationError, utc_time_sans_frac +from cryptojwt.jwt import VerificationError from cryptojwt.jwt import pick_key +from cryptojwt.jwt import utc_time_sans_frac from cryptojwt.key_bundle import KeyBundle from cryptojwt.key_jar import KeyJar from cryptojwt.key_jar import init_key_jar From aa29de18b9b949216d1a0a970779dd164a72e027 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Sat, 22 Jul 2023 22:23:34 -0700 Subject: [PATCH 4/5] typo --- src/cryptojwt/jwt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index 79d1e17b..2df54fc8 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -312,7 +312,7 @@ def unpack(self, token, timestamp=None): Unpack a received signed or signed and encrypted Json Web Token :param token: The Json Web Token - :param t: Time for evaluation (default now) + :param timestamp: Time for evaluation (default now) :return: If decryption and signature verification work the payload will be returned as a Message instance if possible. """ From 9ff84a28b122b6a09296a9f75377f4195777457f Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 26 Jul 2023 09:55:58 +0200 Subject: [PATCH 5/5] use time.time() instead for datetime.now(timezone.utc).timestamp() for better performance --- src/cryptojwt/jwt.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/cryptojwt/jwt.py b/src/cryptojwt/jwt.py index 2df54fc8..e212c772 100755 --- a/src/cryptojwt/jwt.py +++ b/src/cryptojwt/jwt.py @@ -1,9 +1,8 @@ """Basic JSON Web Token implementation.""" import json import logging +import time import uuid -from datetime import datetime -from datetime import timezone from json import JSONDecodeError from .exception import HeaderError @@ -28,9 +27,7 @@ def utc_time_sans_frac(): :return: A number of seconds """ - - now_timestampt = int(datetime.now(timezone.utc).timestamp()) - return now_timestampt + return int(time.time()) def pick_key(keys, use, alg="", key_type="", kid=""):