Skip to content

Commit 80a1e0f

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add condition to add fields (#409)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent cab392e commit 80a1e0f

File tree

5 files changed

+72
-3
lines changed

5 files changed

+72
-3
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ definitions:
116116
type: array
117117
items:
118118
"$ref": "#/definitions/AddedFieldDefinition"
119+
condition:
120+
description: Fields will be added if expression is evaluated to True.
121+
type: string
122+
default: ""
123+
interpolation_context:
124+
- config
125+
- property
126+
- parameters
127+
examples:
128+
- "{{ property|string == '' }}"
129+
- "{{ property is integer }}"
130+
- "{{ property|length > 5 }}"
131+
- "{{ property == 'some_string_to_match' }}"
119132
$parameters:
120133
type: object
121134
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,16 @@ class AddFields(BaseModel):
14651465
description="List of transformations (path and corresponding value) that will be added to the record.",
14661466
title="Fields",
14671467
)
1468+
condition: Optional[str] = Field(
1469+
"",
1470+
description="Fields will be added if expression is evaluated to True.,",
1471+
examples=[
1472+
"{{ property|string == '' }}",
1473+
"{{ property is integer }}",
1474+
"{{ property|length > 5 }}",
1475+
"{{ property == 'some_string_to_match' }}",
1476+
],
1477+
)
14681478
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14691479

14701480

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
714714
]
715715
return AddFields(
716716
fields=added_field_definitions,
717+
condition=model.condition or "",
717718
parameters=model.parameters or {},
718719
)
719720

airbyte_cdk/sources/declarative/transformations/add_fields.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import dpath
99

10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1011
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1112
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1213
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState
@@ -86,11 +87,16 @@ class AddFields(RecordTransformation):
8687

8788
fields: List[AddedFieldDefinition]
8889
parameters: InitVar[Mapping[str, Any]]
90+
condition: str = ""
8991
_parsed_fields: List[ParsedAddFieldDefinition] = field(
9092
init=False, repr=False, default_factory=list
9193
)
9294

9395
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
96+
self._filter_interpolator = InterpolatedBoolean(
97+
condition=self.condition, parameters=parameters
98+
)
99+
94100
for add_field in self.fields:
95101
if len(add_field.path) < 1:
96102
raise ValueError(
@@ -132,7 +138,9 @@ def transform(
132138
for parsed_field in self._parsed_fields:
133139
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
134140
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
135-
dpath.new(record, parsed_field.path, value)
141+
is_empty_condition = not self.condition
142+
if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
143+
dpath.new(record, parsed_field.path, value)
136144

137145
def __eq__(self, other: Any) -> bool:
138146
return bool(self.__dict__ == other.__dict__)

unit_tests/sources/declarative/transformations/test_add_fields.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212

1313

1414
@pytest.mark.parametrize(
15-
["input_record", "field", "field_type", "kwargs", "expected"],
15+
["input_record", "field", "field_type", "condition", "kwargs", "expected"],
1616
[
1717
pytest.param(
1818
{"k": "v"},
1919
[(["path"], "static_value")],
2020
None,
21+
"",
2122
{},
2223
{"k": "v", "path": "static_value"},
2324
id="add new static value",
@@ -26,6 +27,7 @@
2627
{"k": "v"},
2728
[(["path"], "{{ 1 }}")],
2829
None,
30+
"",
2931
{},
3032
{"k": "v", "path": 1},
3133
id="add an expression evaluated as a number",
@@ -34,6 +36,7 @@
3436
{"k": "v"},
3537
[(["path"], "{{ 1 }}")],
3638
str,
39+
"",
3740
{},
3841
{"k": "v", "path": "1"},
3942
id="add an expression evaluated as a string using the value_type field",
@@ -42,6 +45,7 @@
4245
{"k": "v"},
4346
[(["path"], "static_value"), (["path2"], "static_value2")],
4447
None,
48+
"",
4549
{},
4650
{"k": "v", "path": "static_value", "path2": "static_value2"},
4751
id="add new multiple static values",
@@ -50,6 +54,7 @@
5054
{"k": "v"},
5155
[(["nested", "path"], "static_value")],
5256
None,
57+
"",
5358
{},
5459
{"k": "v", "nested": {"path": "static_value"}},
5560
id="set static value at nested path",
@@ -58,6 +63,7 @@
5863
{"k": "v"},
5964
[(["k"], "new_value")],
6065
None,
66+
"",
6167
{},
6268
{"k": "new_value"},
6369
id="update value which already exists",
@@ -66,6 +72,7 @@
6672
{"k": [0, 1]},
6773
[(["k", 3], "v")],
6874
None,
75+
"",
6976
{},
7077
{"k": [0, 1, None, "v"]},
7178
id="Set element inside array",
@@ -74,6 +81,7 @@
7481
{"k": "v"},
7582
[(["k2"], '{{ config["shop"] }}')],
7683
None,
84+
"",
7785
{"config": {"shop": "in-n-out"}},
7886
{"k": "v", "k2": "in-n-out"},
7987
id="set a value from the config using bracket notation",
@@ -82,6 +90,7 @@
8290
{"k": "v"},
8391
[(["k2"], "{{ config.shop }}")],
8492
None,
93+
"",
8594
{"config": {"shop": "in-n-out"}},
8695
{"k": "v", "k2": "in-n-out"},
8796
id="set a value from the config using dot notation",
@@ -90,6 +99,7 @@
9099
{"k": "v"},
91100
[(["k2"], '{{ stream_slice["start_date"] }}')],
92101
None,
102+
"",
93103
{"stream_slice": {"start_date": "oct1"}},
94104
{"k": "v", "k2": "oct1"},
95105
id="set a value from the stream slice using bracket notation",
@@ -98,6 +108,7 @@
98108
{"k": "v"},
99109
[(["k2"], "{{ stream_slice.start_date }}")],
100110
None,
111+
"",
101112
{"stream_slice": {"start_date": "oct1"}},
102113
{"k": "v", "k2": "oct1"},
103114
id="set a value from the stream slice using dot notation",
@@ -106,6 +117,7 @@
106117
{"k": "v"},
107118
[(["k2"], "{{ record.k }}")],
108119
None,
120+
"",
109121
{},
110122
{"k": "v", "k2": "v"},
111123
id="set a value from a field in the record using dot notation",
@@ -114,6 +126,7 @@
114126
{"k": "v"},
115127
[(["k2"], '{{ record["k"] }}')],
116128
None,
129+
"",
117130
{},
118131
{"k": "v", "k2": "v"},
119132
id="set a value from a field in the record using bracket notation",
@@ -122,6 +135,7 @@
122135
{"k": {"nested": "v"}},
123136
[(["k2"], "{{ record.k.nested }}")],
124137
None,
138+
"",
125139
{},
126140
{"k": {"nested": "v"}, "k2": "v"},
127141
id="set a value from a nested field in the record using bracket notation",
@@ -130,6 +144,7 @@
130144
{"k": {"nested": "v"}},
131145
[(["k2"], '{{ record["k"]["nested"] }}')],
132146
None,
147+
"",
133148
{},
134149
{"k": {"nested": "v"}, "k2": "v"},
135150
id="set a value from a nested field in the record using bracket notation",
@@ -138,22 +153,44 @@
138153
{"k": "v"},
139154
[(["k2"], "{{ 2 + 2 }}")],
140155
None,
156+
"",
141157
{},
142158
{"k": "v", "k2": 4},
143159
id="set a value from a jinja expression",
144160
),
161+
pytest.param(
162+
{"k": "v"},
163+
[(["path"], "static_value")],
164+
None,
165+
"{{ False }}",
166+
{},
167+
{"k": "v"},
168+
id="do not add any field if condition is boolean False",
169+
),
170+
pytest.param(
171+
{"k": "v"},
172+
[(["path"], "static_value")],
173+
None,
174+
"{{ True }}",
175+
{},
176+
{"k": "v", "path": "static_value"},
177+
id="add all fields if condition is boolean True",
178+
),
145179
],
146180
)
147181
def test_add_fields(
148182
input_record: Mapping[str, Any],
149183
field: List[Tuple[FieldPointer, str]],
150184
field_type: Optional[str],
185+
condition: Optional[str],
151186
kwargs: Mapping[str, Any],
152187
expected: Mapping[str, Any],
153188
):
154189
inputs = [
155190
AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={})
156191
for v in field
157192
]
158-
AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs)
193+
AddFields(fields=inputs, condition=condition, parameters={"alas": "i live"}).transform(
194+
input_record, **kwargs
195+
)
159196
assert input_record == expected

0 commit comments

Comments
 (0)