Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import warnings
from contextlib import suppress
from json import JSONDecodeError
from typing import Any
from urllib.parse import parse_qsl, quote, unquote, urlencode, urlsplit
Expand Down Expand Up @@ -471,21 +472,36 @@ def test_connection(self):

return status, message

@property
def extra_dejson(self) -> dict:
"""Returns the extra property by deserializing json."""
obj = {}
def get_extra_dejson(self, nested: bool = False) -> dict:
"""
Deserialize extra property to JSON.

:param nested: Determines whether nested structures are also deserialized into JSON (default False).
"""
extra = {}

if self.extra:
try:
obj = json.loads(self.extra)

if nested:
for key, value in json.loads(self.extra).items():
extra[key] = value
if isinstance(value, str):
with suppress(JSONDecodeError):
extra[key] = json.loads(value)
else:
extra = json.loads(self.extra)
except JSONDecodeError:
self.log.exception("Failed parsing the json for conn_id %s", self.conn_id)

# Mask sensitive keys from this list
mask_secret(obj)
mask_secret(extra)

return extra

return obj
@property
def extra_dejson(self) -> dict:
"""Returns the extra property by deserializing json."""
return self.get_extra_dejson()

@classmethod
def get_connection_from_secrets(cls, conn_id: str) -> Connection:
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,26 @@ def test_get_uri(self, connection, expected_uri):
# string works as expected.
def test_sanitize_conn_id(self, connection, expected_conn_id):
assert connection.conn_id == expected_conn_id

def test_extra_dejson(self):
extra = (
'{"trust_env": false, "verify": false, "stream": true, "headers":'
'{\r\n "Content-Type": "application/json",\r\n "X-Requested-By": "Airflow"\r\n}}'
)
connection = Connection(
conn_id="pokeapi",
conn_type="http",
login="user",
password="pass",
host="https://pokeapi.co/",
port=100,
schema="https",
extra=extra,
)

assert connection.extra_dejson == {
"trust_env": False,
"verify": False,
"stream": True,
"headers": {"Content-Type": "application/json", "X-Requested-By": "Airflow"},
}