Skip to content

Commit b94c05a

Browse files
refactored KeyTransformation: added prefix and suffix
1 parent 58a64fe commit b94c05a

File tree

5 files changed

+158
-25
lines changed

5 files changed

+158
-25
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,33 @@ definitions:
23072307
$parameters:
23082308
type: object
23092309
additionalProperties: true
2310+
KeyTransformation:
2311+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2312+
type: object
2313+
required:
2314+
- type
2315+
properties:
2316+
type:
2317+
type: string
2318+
enum: [ KeyTransformation ]
2319+
prefix:
2320+
title: Key Prefix
2321+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2322+
type: string
2323+
examples:
2324+
- flattened_
2325+
suffix:
2326+
title: Key Suffix
2327+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2328+
type: string
2329+
examples:
2330+
- _flattened
2331+
custom:
2332+
title: Custom Transformation for keys
2333+
description: Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.
2334+
type: string
2335+
examples:
2336+
- flattened_{{ key }}
23102337
DpathFlattenFields:
23112338
title: Dpath Flatten Fields
23122339
description: A transformation that flatten field values to the to top of the record.
@@ -2338,9 +2365,8 @@ definitions:
23382365
key_transformation:
23392366
title: Key transformation
23402367
description: Transformation for object keys. If not provided, original key will be used.
2341-
type: string
2342-
examples:
2343-
- "flattened_{{ key }}"
2368+
type: object
2369+
"$ref": "#/definitions/KeyTransformation"
23442370
$parameters:
23452371
type: object
23462372
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,33 @@ class FlattenFields(BaseModel):
879879
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
880880

881881

882+
class KeyTransformation(BaseModel):
883+
prefix: Optional[Union[str, None]] = Field(
884+
None,
885+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886+
examples=[
887+
"flattened_",
888+
],
889+
title="Key Prefix",
890+
)
891+
suffix: Optional[Union[str, None]] = Field(
892+
None,
893+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894+
examples=[
895+
"_flattened",
896+
],
897+
title="Key Suffix",
898+
)
899+
custom: Optional[Union[str, None]] = Field(
900+
None,
901+
description="Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.",
902+
examples=[
903+
"flattened_{{ key }}",
904+
],
905+
title="Custom Transformation for keys",
906+
)
907+
908+
882909
class DpathFlattenFields(BaseModel):
883910
type: Literal["DpathFlattenFields"]
884911
field_path: List[str] = Field(
@@ -897,12 +924,9 @@ class DpathFlattenFields(BaseModel):
897924
description="Whether to replace the origin record or not. Default is False.",
898925
title="Replace Origin Record",
899926
)
900-
key_transformation: Optional[str] = Field(
927+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
901928
None,
902929
description="Transformation for object keys. If not provided, original key will be used.",
903-
examples=[
904-
"flattened_{{ key }}",
905-
],
906930
title="Key transformation",
907931
)
908932
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@
498498
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
499499
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
500500
DpathFlattenFields,
501+
KeyTransformation,
501502
)
502503
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
503504
FlattenFields,
@@ -790,14 +791,23 @@ def create_dpath_flatten_fields(
790791
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
791792
) -> DpathFlattenFields:
792793
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
794+
key_transformation = (
795+
KeyTransformation(
796+
prefix=model.key_transformation.prefix,
797+
suffix=model.key_transformation.suffix,
798+
custom=model.key_transformation.custom,
799+
)
800+
if model.key_transformation is not None
801+
else None
802+
)
793803
return DpathFlattenFields(
794804
config=config,
795805
field_path=model_field_path,
796806
delete_origin_value=model.delete_origin_value
797807
if model.delete_origin_value is not None
798808
else False,
799809
replace_record=model.replace_record if model.replace_record is not None else False,
800-
key_transformation=model.key_transformation,
810+
key_transformation=key_transformation,
801811
parameters=model.parameters or {},
802812
)
803813

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@
88
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
99

1010

11+
@dataclass(frozen=True)
12+
class KeyTransformation:
13+
prefix: Union[InterpolatedString, str, None] = None
14+
suffix: Union[InterpolatedString, str, None] = None
15+
custom: Union[InterpolatedString, str, None] = None
16+
17+
1118
@dataclass
1219
class DpathFlattenFields(RecordTransformation):
1320
"""
@@ -16,7 +23,7 @@ class DpathFlattenFields(RecordTransformation):
1623
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1724
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1825
replace_record: bool = False whether to replace origin record or not. Default is False.
19-
key_transformation: string = None how to transform extracted object keys
26+
key_transformation: KeyTransformation = None how to transform extracted object keys
2027
2128
"""
2229

@@ -25,7 +32,7 @@ class DpathFlattenFields(RecordTransformation):
2532
parameters: InitVar[Mapping[str, Any]]
2633
delete_origin_value: bool = False
2734
replace_record: bool = False
28-
key_transformation: Union[InterpolatedString, str, None] = None
35+
key_transformation: Union[KeyTransformation, None] = None
2936

3037
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3138
self._parameters = parameters
@@ -38,6 +45,30 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3845
self.field_path[path_index], parameters=self._parameters
3946
)
4047

48+
def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
49+
if self.key_transformation:
50+
if self.key_transformation.prefix:
51+
prefix = InterpolatedString.create(
52+
self.key_transformation.prefix, parameters=self._parameters
53+
).eval(config=self.config)
54+
extracted = {f"{prefix}{key}": value for key, value in extracted.items()}
55+
56+
if self.key_transformation.suffix:
57+
suffix = InterpolatedString.create(
58+
self.key_transformation.suffix, parameters=self._parameters
59+
).eval(config=self.config)
60+
extracted = {f"{key}{suffix}": value for key, value in extracted.items()}
61+
62+
if self.key_transformation.custom:
63+
updated_extracted = {}
64+
for key, value in extracted.items():
65+
updated_key = InterpolatedString.create(
66+
self.key_transformation.custom, parameters=self._parameters
67+
).eval(key=key, config=self.config)
68+
updated_extracted[updated_key] = value
69+
extracted = updated_extracted
70+
return extracted
71+
4172
def transform(
4273
self,
4374
record: Dict[str, Any],
@@ -53,14 +84,7 @@ def transform(
5384
extracted = dpath.get(record, path, default=[])
5485

5586
if isinstance(extracted, dict):
56-
if self.key_transformation:
57-
updated_extracted = {}
58-
for key, value in extracted.items():
59-
updated_key = InterpolatedString.create(
60-
self.key_transformation, parameters=self._parameters
61-
).eval(key=key, config=self.config)
62-
updated_extracted[updated_key] = value
63-
extracted = updated_extracted
87+
extracted = self._apply_key_transformation(extracted)
6488

6589
if self.replace_record and extracted:
6690
dpath.delete(record, "**")

unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
import pytest
22

3-
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields
3+
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
4+
DpathFlattenFields,
5+
KeyTransformation,
6+
)
47

58
_ANY_VALUE = -1
69
_DELETE_ORIGIN_VALUE = True
710
_REPLACE_WITH_VALUE = True
811
_DO_NOT_DELETE_ORIGIN_VALUE = False
912
_DO_NOT_REPLACE_WITH_VALUE = False
13+
_NO_KEY_PREFIX = None
14+
_NO_KEY_SUFFIX = None
15+
_NO_CUSTOM_TRANSFORMATION = None
1016
_NO_KEY_TRANSFORMATIONS = None
1117

1218

@@ -147,29 +153,69 @@
147153
["field2"],
148154
_DO_NOT_DELETE_ORIGIN_VALUE,
149155
_DO_NOT_REPLACE_WITH_VALUE,
150-
"field2_{{ key }}",
156+
(_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"),
151157
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field2_field3": _ANY_VALUE},
152-
id="flatten by dpath, not delete origin value, add keys transformation",
158+
id="flatten by dpath, not delete origin value, add keys custom transformation",
153159
),
154160
pytest.param(
155161
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
156162
{},
157163
["field2"],
158164
_DELETE_ORIGIN_VALUE,
159165
_DO_NOT_REPLACE_WITH_VALUE,
160-
"field2_{{ key }}",
166+
(_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"),
161167
{"field1": _ANY_VALUE, "field2_field3": _ANY_VALUE},
162-
id="flatten by dpath, delete origin value, add keys transformation",
168+
id="flatten by dpath, delete origin value, add keys custom transformation",
163169
),
164170
pytest.param(
165171
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
166172
{},
167173
["field2"],
168174
_DO_NOT_DELETE_ORIGIN_VALUE,
169175
_REPLACE_WITH_VALUE,
170-
"field2_{{ key }}",
176+
(_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"),
171177
{"field2_field3": _ANY_VALUE},
172-
id="flatten by dpath, not delete origin value, replace record, add keys transformation",
178+
id="flatten by dpath, not delete origin value, replace record, add keys custom transformation",
179+
),
180+
pytest.param(
181+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
182+
{},
183+
["field2"],
184+
_DO_NOT_DELETE_ORIGIN_VALUE,
185+
_REPLACE_WITH_VALUE,
186+
("prefix_", _NO_KEY_SUFFIX, _NO_CUSTOM_TRANSFORMATION),
187+
{"prefix_field3": _ANY_VALUE},
188+
id="flatten by dpath, not delete origin value, replace record, add keys prefix",
189+
),
190+
pytest.param(
191+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
192+
{},
193+
["field2"],
194+
_DO_NOT_DELETE_ORIGIN_VALUE,
195+
_REPLACE_WITH_VALUE,
196+
(_NO_KEY_PREFIX, "_suffix", _NO_CUSTOM_TRANSFORMATION),
197+
{"field3_suffix": _ANY_VALUE},
198+
id="flatten by dpath, not delete origin value, replace record, add keys suffix",
199+
),
200+
pytest.param(
201+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
202+
{},
203+
["field2"],
204+
_DO_NOT_DELETE_ORIGIN_VALUE,
205+
_REPLACE_WITH_VALUE,
206+
("prefix_", "_suffix", _NO_CUSTOM_TRANSFORMATION),
207+
{"prefix_field3_suffix": _ANY_VALUE},
208+
id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix",
209+
),
210+
pytest.param(
211+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
212+
{},
213+
["field2"],
214+
_DO_NOT_DELETE_ORIGIN_VALUE,
215+
_REPLACE_WITH_VALUE,
216+
("prefix_", "_suffix", "{{ key|upper }}"),
217+
{"PREFIX_FIELD3_SUFFIX": _ANY_VALUE},
218+
id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix with custom to upper case",
173219
),
174220
],
175221
)
@@ -182,6 +228,9 @@ def test_dpath_flatten_lists(
182228
key_transformation,
183229
expected_record,
184230
):
231+
if key_transformation:
232+
key_transformation = KeyTransformation(*key_transformation)
233+
185234
flattener = DpathFlattenFields(
186235
field_path=field_path,
187236
parameters={},

0 commit comments

Comments
 (0)