Skip to content

Commit dc10839

Browse files
authored
feat: implement validators, validation strategies, and RemapField transformer (#533)
1 parent 9ad1bc0 commit dc10839

File tree

17 files changed

+1071
-0
lines changed

17 files changed

+1071
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from .add_fields import ConfigAddFields
6+
from .remap_field import ConfigRemapField
7+
from .remove_fields import ConfigRemoveFields
8+
9+
__all__ = ["ConfigRemapField", "ConfigAddFields", "ConfigRemoveFields"]
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass, field
6+
from typing import Any, List, MutableMapping, Optional, Type, Union
7+
8+
import dpath
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
11+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
12+
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
13+
ConfigTransformation,
14+
)
15+
from airbyte_cdk.sources.types import FieldPointer
16+
17+
18+
@dataclass(frozen=True)
19+
class AddedFieldDefinition:
20+
"""Defines the field to add on a config"""
21+
22+
path: FieldPointer
23+
value: Union[InterpolatedString, str]
24+
value_type: Optional[Type[Any]] = None
25+
26+
27+
@dataclass(frozen=True)
28+
class ParsedAddFieldDefinition:
29+
"""Defines the field to add on a config"""
30+
31+
path: FieldPointer
32+
value: InterpolatedString
33+
value_type: Optional[Type[Any]] = None
34+
35+
36+
@dataclass
37+
class ConfigAddFields(ConfigTransformation):
38+
"""
39+
Transformation which adds fields to a config. The path of the added field can be nested. Adding nested fields will create all
40+
necessary parent objects (like mkdir -p).
41+
42+
This transformation has access to the config being transformed.
43+
44+
Examples of instantiating this transformation via YAML:
45+
- type: ConfigAddFields
46+
fields:
47+
# hardcoded constant
48+
- path: ["path"]
49+
value: "static_value"
50+
51+
# nested path
52+
- path: ["path", "to", "field"]
53+
value: "static"
54+
55+
# from config
56+
- path: ["derived_field"]
57+
value: "{{ config.original_field }}"
58+
59+
# by supplying any valid Jinja template directive or expression
60+
- path: ["two_times_two"]
61+
value: "{{ 2 * 2 }}"
62+
63+
Attributes:
64+
fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the config
65+
"""
66+
67+
fields: List[AddedFieldDefinition]
68+
condition: str = ""
69+
_parsed_fields: List[ParsedAddFieldDefinition] = field(
70+
init=False, repr=False, default_factory=list
71+
)
72+
73+
def __post_init__(self) -> None:
74+
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
75+
76+
for add_field in self.fields:
77+
if len(add_field.path) < 1:
78+
raise ValueError(
79+
f"Expected a non-zero-length path for the AddFields transformation {add_field}"
80+
)
81+
82+
if not isinstance(add_field.value, InterpolatedString):
83+
if not isinstance(add_field.value, str):
84+
raise ValueError(
85+
f"Expected a string value for the AddFields transformation: {add_field}"
86+
)
87+
else:
88+
self._parsed_fields.append(
89+
ParsedAddFieldDefinition(
90+
add_field.path,
91+
InterpolatedString.create(add_field.value, parameters={}),
92+
value_type=add_field.value_type,
93+
)
94+
)
95+
else:
96+
self._parsed_fields.append(
97+
ParsedAddFieldDefinition(
98+
add_field.path,
99+
add_field.value,
100+
value_type=add_field.value_type,
101+
)
102+
)
103+
104+
def transform(
105+
self,
106+
config: MutableMapping[str, Any],
107+
) -> None:
108+
"""
109+
Transforms a config by adding fields based on the provided field definitions.
110+
111+
:param config: The user-provided configuration to be transformed
112+
"""
113+
for parsed_field in self._parsed_fields:
114+
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
115+
value = parsed_field.value.eval(config, valid_types=valid_types)
116+
if not self.condition or self._filter_interpolator.eval(
117+
config, value=value, path=parsed_field.path
118+
):
119+
dpath.new(config, parsed_field.path, value)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Any, MutableMapping
7+
8+
9+
class ConfigTransformation(ABC):
10+
"""
11+
Implementations of this class define transformations that can be applied to source configurations.
12+
"""
13+
14+
@abstractmethod
15+
def transform(
16+
self,
17+
config: MutableMapping[str, Any],
18+
) -> None:
19+
"""
20+
Transform a configuration by adding, deleting, or mutating fields directly from the config reference passed in argument.
21+
22+
:param config: The user-provided configuration to be transformed
23+
"""
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass, field
6+
from typing import Any, List, Mapping, MutableMapping, Union
7+
8+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
9+
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
12+
ConfigTransformation,
13+
)
14+
15+
16+
@dataclass
17+
class ConfigRemapField(ConfigTransformation):
18+
"""
19+
Transformation that remaps a field's value to another value based on a static map.
20+
"""
21+
22+
map: Mapping[str, Any]
23+
field_path: List[Union[InterpolatedString, str]]
24+
config: Mapping[str, Any] = field(default_factory=dict)
25+
26+
def __post_init__(self) -> None:
27+
if not self.field_path:
28+
raise Exception("field_path cannot be empty.")
29+
self._field_path = [
30+
InterpolatedString.create(path, parameters={}) for path in self.field_path
31+
]
32+
for path_index in range(len(self.field_path)):
33+
if isinstance(self.field_path[path_index], str):
34+
self._field_path[path_index] = InterpolatedString.create(
35+
self.field_path[path_index], parameters={}
36+
)
37+
self._map = InterpolatedMapping(self.map, parameters={})
38+
39+
def transform(
40+
self,
41+
config: MutableMapping[str, Any],
42+
) -> None:
43+
"""
44+
Transforms a config by remapping a field value based on the provided map.
45+
If the original value is found in the map, it's replaced with the mapped value.
46+
If the value is not in the map, the field remains unchanged.
47+
48+
:param config: The user-provided configuration to be transformed
49+
"""
50+
path_components = [path.eval(config) for path in self._field_path]
51+
52+
current = config
53+
for i, component in enumerate(path_components[:-1]):
54+
if component not in current:
55+
return
56+
current = current[component]
57+
58+
if not isinstance(current, MutableMapping):
59+
return
60+
61+
field_name = path_components[-1]
62+
63+
mapping = self._map.eval(config=self.config)
64+
65+
if field_name in current and current[field_name] in mapping:
66+
current[field_name] = mapping[current[field_name]]
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
from dataclasses import dataclass
5+
from typing import Any, List, MutableMapping
6+
7+
import dpath
8+
import dpath.exceptions
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
11+
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
12+
ConfigTransformation,
13+
)
14+
from airbyte_cdk.sources.types import FieldPointer
15+
16+
17+
@dataclass
18+
class ConfigRemoveFields(ConfigTransformation):
19+
"""
20+
A transformation which removes fields from a config. The fields removed are designated using FieldPointers.
21+
During transformation, if a field or any of its parents does not exist in the config, no error is thrown.
22+
23+
If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then
24+
the object at that index is set to None rather than being entirely removed from the list.
25+
26+
It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}
27+
28+
Usage syntax:
29+
30+
```yaml
31+
config_transformations:
32+
- type: RemoveFields
33+
field_pointers:
34+
- ["path", "to", "field1"]
35+
- ["path2"]
36+
condition: "{{ config.some_flag }}" # Optional condition
37+
```
38+
39+
Attributes:
40+
field_pointers (List[FieldPointer]): pointers to the fields that should be removed
41+
condition (str): Optional condition that determines if the fields should be removed
42+
"""
43+
44+
field_pointers: List[FieldPointer]
45+
condition: str = ""
46+
47+
def __post_init__(self) -> None:
48+
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
49+
50+
def transform(
51+
self,
52+
config: MutableMapping[str, Any],
53+
) -> None:
54+
"""
55+
Transforms a config by removing fields based on the provided field pointers.
56+
57+
:param config: The user-provided configuration to be transformed
58+
"""
59+
if self.condition and not self._filter_interpolator.eval(config):
60+
return
61+
62+
for pointer in self.field_pointers:
63+
try:
64+
dpath.delete(config, pointer)
65+
except dpath.exceptions.PathNotFound:
66+
pass
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.validators.dpath_validator import DpathValidator
6+
from airbyte_cdk.sources.declarative.validators.predicate_validator import PredicateValidator
7+
from airbyte_cdk.sources.declarative.validators.validate_adheres_to_schema import (
8+
ValidateAdheresToSchema,
9+
)
10+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
11+
from airbyte_cdk.sources.declarative.validators.validator import Validator
12+
13+
__all__ = [
14+
"Validator",
15+
"DpathValidator",
16+
"ValidationStrategy",
17+
"ValidateAdheresToSchema",
18+
"PredicateValidator",
19+
]
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, List, Union
7+
8+
import dpath.util
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
12+
from airbyte_cdk.sources.declarative.validators.validator import Validator
13+
14+
15+
@dataclass
16+
class DpathValidator(Validator):
17+
"""
18+
Validator that extracts a value at a specific path in the input data
19+
and applies a validation strategy to it.
20+
"""
21+
22+
field_path: List[Union[InterpolatedString, str]]
23+
strategy: ValidationStrategy
24+
25+
def __post_init__(self) -> None:
26+
self._field_path = [
27+
InterpolatedString.create(path, parameters={}) for path in self.field_path
28+
]
29+
for path_index in range(len(self.field_path)):
30+
if isinstance(self.field_path[path_index], str):
31+
self._field_path[path_index] = InterpolatedString.create(
32+
self.field_path[path_index], parameters={}
33+
)
34+
35+
def validate(self, input_data: dict[str, Any]) -> None:
36+
"""
37+
Extracts the value at the specified path and applies the validation strategy.
38+
39+
:param input_data: Dictionary containing the data to validate
40+
:raises ValueError: If the path doesn't exist or validation fails
41+
"""
42+
path = [path.eval({}) for path in self._field_path]
43+
44+
if len(path) == 0:
45+
raise ValueError("Field path is empty")
46+
47+
if "*" in path:
48+
try:
49+
values = dpath.values(input_data, path)
50+
for value in values:
51+
self.strategy.validate(value)
52+
except KeyError as e:
53+
raise ValueError(f"Error validating path '{self.field_path}': {e}")
54+
else:
55+
try:
56+
value = dpath.get(input_data, path)
57+
self.strategy.validate(value)
58+
except KeyError as e:
59+
raise ValueError(f"Error validating path '{self.field_path}': {e}")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any
7+
8+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
9+
10+
11+
@dataclass
12+
class PredicateValidator:
13+
"""
14+
Validator that applies a validation strategy to a value.
15+
"""
16+
17+
value: Any
18+
strategy: ValidationStrategy
19+
20+
def validate(self) -> None:
21+
"""
22+
Applies the validation strategy to the value.
23+
24+
:raises ValueError: If validation fails
25+
"""
26+
self.strategy.validate(self.value)

0 commit comments

Comments
 (0)