diff --git a/airflow/models/connection.py b/airflow/models/connection.py index 6e1435ebfad65..95855e15fe76e 100644 --- a/airflow/models/connection.py +++ b/airflow/models/connection.py @@ -20,6 +20,7 @@ import json import logging import warnings +from contextlib import suppress from json import JSONDecodeError from typing import Any from urllib.parse import parse_qsl, quote, unquote, urlencode, urlsplit @@ -471,21 +472,36 @@ def test_connection(self): return status, message - @property - def extra_dejson(self) -> dict: - """Returns the extra property by deserializing json.""" - obj = {} + def get_extra_dejson(self, nested: bool = False) -> dict: + """ + Deserialize extra property to JSON. + + :param nested: Determines whether nested structures are also deserialized into JSON (default False). + """ + extra = {} + if self.extra: try: - obj = json.loads(self.extra) - + if nested: + for key, value in json.loads(self.extra).items(): + extra[key] = value + if isinstance(value, str): + with suppress(JSONDecodeError): + extra[key] = json.loads(value) + else: + extra = json.loads(self.extra) except JSONDecodeError: self.log.exception("Failed parsing the json for conn_id %s", self.conn_id) # Mask sensitive keys from this list - mask_secret(obj) + mask_secret(extra) + + return extra - return obj + @property + def extra_dejson(self) -> dict: + """Returns the extra property by deserializing json.""" + return self.get_extra_dejson() @classmethod def get_connection_from_secrets(cls, conn_id: str) -> Connection: diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py index 21e5682c8d8a2..3f7504713f9c4 100644 --- a/tests/models/test_connection.py +++ b/tests/models/test_connection.py @@ -250,3 +250,26 @@ def test_get_uri(self, connection, expected_uri): # string works as expected. def test_sanitize_conn_id(self, connection, expected_conn_id): assert connection.conn_id == expected_conn_id + + def test_extra_dejson(self): + extra = ( + '{"trust_env": false, "verify": false, "stream": true, "headers":' + '{\r\n "Content-Type": "application/json",\r\n "X-Requested-By": "Airflow"\r\n}}' + ) + connection = Connection( + conn_id="pokeapi", + conn_type="http", + login="user", + password="pass", + host="https://pokeapi.co/", + port=100, + schema="https", + extra=extra, + ) + + assert connection.extra_dejson == { + "trust_env": False, + "verify": False, + "stream": True, + "headers": {"Content-Type": "application/json", "X-Requested-By": "Airflow"}, + }