|  | 
|  | 1 | +from typing import Any, Dict, List, Union | 
|  | 2 | + | 
|  | 3 | +import yaml | 
|  | 4 | + | 
|  | 5 | +from datacontract.export.exporter import Exporter, _check_models_for_export | 
|  | 6 | +from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model, Quality | 
|  | 7 | + | 
|  | 8 | + | 
|  | 9 | +class DqxKeys: | 
|  | 10 | +    CHECK = "check" | 
|  | 11 | +    ARGUMENTS = "arguments" | 
|  | 12 | +    SPECIFICATION = "specification" | 
|  | 13 | +    COL_NAME = "column" | 
|  | 14 | +    COL_NAMES = "for_each_column" | 
|  | 15 | +    COLUMNS = "columns" | 
|  | 16 | +    FUNCTION = "function" | 
|  | 17 | + | 
|  | 18 | + | 
|  | 19 | +class DqxExporter(Exporter): | 
|  | 20 | +    """Exporter implementation for converting data contracts to DQX YAML file.""" | 
|  | 21 | + | 
|  | 22 | +    def export( | 
|  | 23 | +        self, | 
|  | 24 | +        data_contract: DataContractSpecification, | 
|  | 25 | +        model: Model, | 
|  | 26 | +        server: str, | 
|  | 27 | +        sql_server_type: str, | 
|  | 28 | +        export_args: Dict[str, Any], | 
|  | 29 | +    ) -> str: | 
|  | 30 | +        """Exports a data contract to DQX format.""" | 
|  | 31 | +        model_name, model_value = _check_models_for_export(data_contract, model, self.export_format) | 
|  | 32 | +        return to_dqx_yaml(model_value) | 
|  | 33 | + | 
|  | 34 | + | 
|  | 35 | +def to_dqx_yaml(model_value: Model) -> str: | 
|  | 36 | +    """ | 
|  | 37 | +    Converts the data contract's quality checks to DQX YAML format. | 
|  | 38 | +
 | 
|  | 39 | +    Args: | 
|  | 40 | +        model_value (Model): The data contract to convert. | 
|  | 41 | +
 | 
|  | 42 | +    Returns: | 
|  | 43 | +        str: YAML representation of the data contract's quality checks. | 
|  | 44 | +    """ | 
|  | 45 | +    extracted_rules = extract_quality_rules(model_value) | 
|  | 46 | +    return yaml.dump(extracted_rules, sort_keys=False, allow_unicode=True, default_flow_style=False) | 
|  | 47 | + | 
|  | 48 | + | 
|  | 49 | +def process_quality_rule(rule: Quality, column_name: str) -> Dict[str, Any]: | 
|  | 50 | +    """ | 
|  | 51 | +    Processes a single quality rule by injecting the column path into its arguments if absent. | 
|  | 52 | +
 | 
|  | 53 | +    Args: | 
|  | 54 | +        rule (Quality): The quality rule to process. | 
|  | 55 | +        column_name (str): The full path to the current column. | 
|  | 56 | +
 | 
|  | 57 | +    Returns: | 
|  | 58 | +        dict: The processed quality rule specification. | 
|  | 59 | +    """ | 
|  | 60 | +    rule_data = rule.model_extra | 
|  | 61 | +    specification = rule_data[DqxKeys.SPECIFICATION] | 
|  | 62 | +    check = specification[DqxKeys.CHECK] | 
|  | 63 | + | 
|  | 64 | +    arguments = check.setdefault(DqxKeys.ARGUMENTS, {}) | 
|  | 65 | + | 
|  | 66 | +    if DqxKeys.COL_NAME not in arguments and DqxKeys.COL_NAMES not in arguments and DqxKeys.COLUMNS not in arguments: | 
|  | 67 | +        if check[DqxKeys.FUNCTION] not in ("is_unique", "foreign_key"): | 
|  | 68 | +            arguments[DqxKeys.COL_NAME] = column_name | 
|  | 69 | +        else: | 
|  | 70 | +            arguments[DqxKeys.COLUMNS] = [column_name] | 
|  | 71 | + | 
|  | 72 | +    return specification | 
|  | 73 | + | 
|  | 74 | + | 
|  | 75 | +def extract_quality_rules(data: Union[Model, Field, Quality], column_path: str = "") -> List[Dict[str, Any]]: | 
|  | 76 | +    """ | 
|  | 77 | +    Recursively extracts all quality rules from a data contract structure. | 
|  | 78 | +
 | 
|  | 79 | +    Args: | 
|  | 80 | +        data (Union[Model, Field, Quality]): The data contract model, field, or quality rule. | 
|  | 81 | +        column_path (str, optional): The current path in the schema hierarchy. Defaults to "". | 
|  | 82 | +
 | 
|  | 83 | +    Returns: | 
|  | 84 | +        List[Dict[str, Any]]: A list of quality rule specifications. | 
|  | 85 | +    """ | 
|  | 86 | +    quality_rules = [] | 
|  | 87 | + | 
|  | 88 | +    if isinstance(data, Quality): | 
|  | 89 | +        return [process_quality_rule(data, column_path)] | 
|  | 90 | + | 
|  | 91 | +    if isinstance(data, (Model, Field)): | 
|  | 92 | +        for key, field in data.fields.items(): | 
|  | 93 | +            current_path = build_column_path(column_path, key) | 
|  | 94 | + | 
|  | 95 | +            if field.fields: | 
|  | 96 | +                # Field is a struct-like object, recurse deeper | 
|  | 97 | +                quality_rules.extend(extract_quality_rules(field, current_path)) | 
|  | 98 | +            else: | 
|  | 99 | +                # Process quality rules at leaf fields | 
|  | 100 | +                for rule in field.quality: | 
|  | 101 | +                    quality_rules.append(process_quality_rule(rule, current_path)) | 
|  | 102 | + | 
|  | 103 | +        # Process any quality rules attached directly to this level | 
|  | 104 | +        for rule in data.quality: | 
|  | 105 | +            quality_rules.append(process_quality_rule(rule, column_path)) | 
|  | 106 | + | 
|  | 107 | +    return quality_rules | 
|  | 108 | + | 
|  | 109 | + | 
|  | 110 | +def build_column_path(current_path: str, key: str) -> str: | 
|  | 111 | +    """ | 
|  | 112 | +    Builds the full column path by concatenating parent path with current key. | 
|  | 113 | +
 | 
|  | 114 | +    Args: | 
|  | 115 | +        current_path (str): The current path prefix. | 
|  | 116 | +        key (str): The current field's key. | 
|  | 117 | +
 | 
|  | 118 | +    Returns: | 
|  | 119 | +        str: The full path. | 
|  | 120 | +    """ | 
|  | 121 | +    return f"{current_path}.{key}" if current_path else key | 
0 commit comments