Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 102 additions & 59 deletions datacontract/engines/data_contract_checks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import re
import uuid
from dataclasses import dataclass
from typing import List
from venv import logger

Expand All @@ -9,6 +11,12 @@
from datacontract.model.run import Check


@dataclass
class QuotingConfig:
quote_field_name: bool = False
quote_model_name: bool = False


def create_checks(data_contract_spec: DataContractSpecification, server: Server) -> List[Check]:
checks: List[Check] = []
for model_key, model_value in data_contract_spec.models.items():
Expand All @@ -26,37 +34,41 @@ def to_model_checks(model_key, model_value, server: Server) -> List[Check]:
fields = model_value.fields

check_types = is_check_types(server)
quote_field_name = server_type in ["postgres", "sqlserver"]

quoting_config = QuotingConfig(
quote_field_name=server_type in ["postgres", "sqlserver"],
quote_model_name=server_type in ["postgres", "sqlserver"],
)

for field_name, field in fields.items():
checks.append(check_field_is_present(model_name, field_name, quote_field_name))
checks.append(check_field_is_present(model_name, field_name, quoting_config))
if check_types and field.type is not None:
sql_type = convert_to_sql_type(field, server_type)
checks.append(check_field_type(model_name, field_name, sql_type, quote_field_name))
checks.append(check_field_type(model_name, field_name, sql_type, quoting_config))
if field.required:
checks.append(check_field_required(model_name, field_name, quote_field_name))
checks.append(check_field_required(model_name, field_name, quoting_config))
if field.unique:
checks.append(check_field_unique(model_name, field_name, quote_field_name))
checks.append(check_field_unique(model_name, field_name, quoting_config))
if field.minLength is not None:
checks.append(check_field_min_length(model_name, field_name, field.minLength, quote_field_name))
checks.append(check_field_min_length(model_name, field_name, field.minLength, quoting_config))
if field.maxLength is not None:
checks.append(check_field_max_length(model_name, field_name, field.maxLength, quote_field_name))
checks.append(check_field_max_length(model_name, field_name, field.maxLength, quoting_config))
if field.minimum is not None:
checks.append(check_field_minimum(model_name, field_name, field.minimum, quote_field_name))
checks.append(check_field_minimum(model_name, field_name, field.minimum, quoting_config))
if field.maximum is not None:
checks.append(check_field_maximum(model_name, field_name, field.maximum, quote_field_name))
checks.append(check_field_maximum(model_name, field_name, field.maximum, quoting_config))
if field.exclusiveMinimum is not None:
checks.append(check_field_minimum(model_name, field_name, field.exclusiveMinimum, quote_field_name))
checks.append(check_field_not_equal(model_name, field_name, field.exclusiveMinimum, quote_field_name))
checks.append(check_field_minimum(model_name, field_name, field.exclusiveMinimum, quoting_config))
checks.append(check_field_not_equal(model_name, field_name, field.exclusiveMinimum, quoting_config))
if field.exclusiveMaximum is not None:
checks.append(check_field_maximum(model_name, field_name, field.exclusiveMaximum, quote_field_name))
checks.append(check_field_not_equal(model_name, field_name, field.exclusiveMaximum, quote_field_name))
checks.append(check_field_maximum(model_name, field_name, field.exclusiveMaximum, quoting_config))
checks.append(check_field_not_equal(model_name, field_name, field.exclusiveMaximum, quoting_config))
if field.pattern is not None:
checks.append(check_field_regex(model_name, field_name, field.pattern, quote_field_name))
checks.append(check_field_regex(model_name, field_name, field.pattern, quoting_config))
if field.enum is not None and len(field.enum) > 0:
checks.append(check_field_enum(model_name, field_name, field.enum, quote_field_name))
checks.append(check_field_enum(model_name, field_name, field.enum, quoting_config))
if field.quality is not None and len(field.quality) > 0:
quality_list = check_quality_list(model_name, field_name, field.quality)
quality_list = check_quality_list(model_name, field_name, field.quality, quoting_config)
if (quality_list is not None) and len(quality_list) > 0:
checks.extend(quality_list)
# TODO references: str = None
Expand All @@ -70,8 +82,8 @@ def to_model_checks(model_key, model_value, server: Server) -> List[Check]:
return checks


def checks_for(model_name, quote_field_name):
if quote_field_name:
def checks_for(model_name, quote_model_name: bool):
if quote_model_name:
return f'checks for "{model_name}"'
return f"checks for {model_name}"

Expand All @@ -98,11 +110,11 @@ def to_model_name(model_key, model_value, server_type):
return model_key


def check_field_is_present(model_name, field_name, quote_field_name: bool) -> Check:
def check_field_is_present(model_name, field_name, quoting_config: QuotingConfig = QuotingConfig()) -> Check:
check_type = "field_is_present"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
"schema": {
"name": check_key,
Expand All @@ -127,11 +139,13 @@ def check_field_is_present(model_name, field_name, quote_field_name: bool) -> Ch
)


def check_field_type(model_name: str, field_name: str, expected_type: str, quote_field_name: bool = False):
def check_field_type(
model_name: str, field_name: str, expected_type: str, quoting_config: QuotingConfig = QuotingConfig()
):
check_type = "field_type"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
"schema": {
"name": check_key,
Expand All @@ -158,16 +172,16 @@ def check_field_type(model_name: str, field_name: str, expected_type: str, quote
)


def check_field_required(model_name: str, field_name: str, quote_field_name: bool = False):
if quote_field_name:
def check_field_required(model_name: str, field_name: str, quoting_config: QuotingConfig = QuotingConfig()):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_required"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"missing_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -189,16 +203,16 @@ def check_field_required(model_name: str, field_name: str, quote_field_name: boo
)


def check_field_unique(model_name: str, field_name: str, quote_field_name: bool = False):
if quote_field_name:
def check_field_unique(model_name: str, field_name: str, quoting_config: QuotingConfig = QuotingConfig()):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_unique"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"duplicate_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -220,16 +234,18 @@ def check_field_unique(model_name: str, field_name: str, quote_field_name: bool
)


def check_field_min_length(model_name: str, field_name: str, min_length: int, quote_field_name: bool = False):
if quote_field_name:
def check_field_min_length(
model_name: str, field_name: str, min_length: int, quoting_config: QuotingConfig = QuotingConfig()
):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_min_length"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -252,16 +268,18 @@ def check_field_min_length(model_name: str, field_name: str, min_length: int, qu
)


def check_field_max_length(model_name: str, field_name: str, max_length: int, quote_field_name: bool = False):
if quote_field_name:
def check_field_max_length(
model_name: str, field_name: str, max_length: int, quoting_config: QuotingConfig = QuotingConfig()
):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_max_length"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -284,16 +302,18 @@ def check_field_max_length(model_name: str, field_name: str, max_length: int, qu
)


def check_field_minimum(model_name: str, field_name: str, minimum: int, quote_field_name: bool = False):
if quote_field_name:
def check_field_minimum(
model_name: str, field_name: str, minimum: int, quoting_config: QuotingConfig = QuotingConfig()
):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_minimum"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -316,16 +336,18 @@ def check_field_minimum(model_name: str, field_name: str, minimum: int, quote_fi
)


def check_field_maximum(model_name: str, field_name: str, maximum: int, quote_field_name: bool = False):
if quote_field_name:
def check_field_maximum(
model_name: str, field_name: str, maximum: int, quoting_config: QuotingConfig = QuotingConfig()
):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_maximum"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -348,16 +370,18 @@ def check_field_maximum(model_name: str, field_name: str, maximum: int, quote_fi
)


def check_field_not_equal(model_name: str, field_name: str, value: int, quote_field_name: bool = False):
if quote_field_name:
def check_field_not_equal(
model_name: str, field_name: str, value: int, quoting_config: QuotingConfig = QuotingConfig()
):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_not_equal"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -380,16 +404,16 @@ def check_field_not_equal(model_name: str, field_name: str, value: int, quote_fi
)


def check_field_enum(model_name: str, field_name: str, enum: list, quote_field_name: bool = False):
if quote_field_name:
def check_field_enum(model_name: str, field_name: str, enum: list, quoting_config: QuotingConfig = QuotingConfig()):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_enum"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -412,16 +436,16 @@ def check_field_enum(model_name: str, field_name: str, enum: list, quote_field_n
)


def check_field_regex(model_name: str, field_name: str, pattern: str, quote_field_name: bool = False):
if quote_field_name:
def check_field_regex(model_name: str, field_name: str, pattern: str, quoting_config: QuotingConfig = QuotingConfig()):
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

check_type = "field_regex"
check_key = f"{model_name}__{field_name}__{check_type}"
sodacl_check_dict = {
checks_for(model_name, quote_field_name): [
checks_for(model_name, quoting_config.quote_model_name): [
{
f"invalid_count({field_name_for_soda}) = 0": {
"name": check_key,
Expand All @@ -444,7 +468,9 @@ def check_field_regex(model_name: str, field_name: str, pattern: str, quote_fiel
)


def check_quality_list(model_name, field_name, quality_list: List[Quality]) -> List[Check]:
def check_quality_list(
model_name, field_name, quality_list: List[Quality], quoting_config: QuotingConfig = QuotingConfig()
) -> List[Check]:
checks: List[Check] = []

count = 0
Expand All @@ -457,15 +483,20 @@ def check_quality_list(model_name, field_name, quality_list: List[Quality]) -> L
check_key = f"{model_name}__{field_name}__quality_sql_{count}"
check_type = "model_quality_sql"
threshold = to_sodacl_threshold(quality)
query = prepare_query(quality, model_name, field_name)
query = prepare_query(quality, model_name, field_name, quoting_config)
if query is None:
logger.warning(f"Quality check {check_key} has no query")
continue
if threshold is None:
logger.warning(f"Quality check {check_key} has no valid threshold")
continue

if quoting_config.quote_model_name:
model_name_for_soda = f'"{model_name}"'
else:
model_name_for_soda = model_name
sodacl_check_dict = {
f"checks for {model_name}": [
f"checks for {model_name_for_soda}": [
{
f"{check_key} {threshold}": {
f"{check_key} query": query,
Expand Down Expand Up @@ -493,22 +524,34 @@ def check_quality_list(model_name, field_name, quality_list: List[Quality]) -> L
return checks


def prepare_query(quality: Quality, model_name: str, field_name: str = None) -> str | None:
def prepare_query(
quality: Quality, model_name: str, field_name: str = None, quoting_config: QuotingConfig = QuotingConfig()
) -> str | None:
if quality.query is None:
return None
if quality.query == "":
return None

query = quality.query

query = query.replace("{model}", model_name)
query = query.replace("{schema}", model_name)
query = query.replace("{table}", model_name)
if quoting_config.quote_field_name:
field_name_for_soda = f'"{field_name}"'
else:
field_name_for_soda = field_name

if quoting_config.quote_model_name:
model_name_for_soda = f'"{model_name}"'
else:
model_name_for_soda = model_name

query = re.sub(r'["\']?\{model}["\']?', model_name_for_soda, query)
query = re.sub(r'["\']?{schema}["\']?', model_name_for_soda, query)
query = re.sub(r'["\']?{table}["\']?', model_name_for_soda, query)

if field_name is not None:
query = query.replace("{field}", field_name)
query = query.replace("{column}", field_name)
query = query.replace("{property}", field_name)
query = re.sub(r'["\']?{field}["\']?', field_name_for_soda, query)
query = re.sub(r'["\']?{column}["\']?', field_name_for_soda, query)
query = re.sub(r'["\']?{property}["\']?', field_name_for_soda, query)

return query

Expand Down
Loading