From a11038e392abef9e4e8ebe854be1edd93477558e Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 10 Apr 2025 18:05:37 +0300 Subject: [PATCH 1/7] added key_transformation to DpathFlattenFields --- .../declarative_component_schema.yaml | 6 +++ .../models/declarative_component_schema.py | 8 ++++ .../parsers/model_to_component_factory.py | 1 + .../transformations/dpath_flatten_fields.py | 11 +++++ .../test_dpath_flatten_fields.py | 47 ++++++++++++++++++- 5 files changed, 71 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2f09579ba..38c4d46f1 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2335,6 +2335,12 @@ definitions: title: Replace Origin Record description: Whether to replace the origin record or not. Default is False. type: boolean + key_transformation: + title: Key transformation + description: Transformation for object keys. If not provided, original key will be used. + type: string + examples: + - "flattened_{{ key }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3566abef4..a8b5daa1c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -897,6 +897,14 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) + key_transformation: Optional[str] = Field( + None, + description="Transformation for object keys. If not provided, original key will be used.", + examples=[ + "flattened_{{ key }}", + ], + title="Key transformation", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 25840f06f..a5e48bc83 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -797,6 +797,7 @@ def create_dpath_flatten_fields( if model.delete_origin_value is not None else False, replace_record=model.replace_record if model.replace_record is not None else False, + key_transformation=model.key_transformation, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 1486f7667..5ea9ba416 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -16,6 +16,7 @@ class DpathFlattenFields(RecordTransformation): field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False. + key_transformation: string = None how to transform extracted object keys """ @@ -24,6 +25,7 @@ class DpathFlattenFields(RecordTransformation): parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False replace_record: bool = False + key_transformation: Union[InterpolatedString, str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -34,6 +36,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path[path_index] = InterpolatedString.create( self.field_path[path_index], parameters=parameters ) + self.parameters = parameters def transform( self, @@ -50,6 +53,14 @@ def transform( extracted = dpath.get(record, path, default=[]) if isinstance(extracted, dict): + + if self.key_transformation: + updated_extracted = {} + for key, value in extracted.items(): + updated_key = InterpolatedString.create(self.key_transformation, parameters=self.parameters).eval(key=key, config=self.config) + updated_extracted[updated_key] = value + extracted = updated_extracted + if self.replace_record and extracted: dpath.delete(record, "**") record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 06842bc7d..2ae535d45 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -7,7 +7,7 @@ _REPLACE_WITH_VALUE = True _DO_NOT_DELETE_ORIGIN_VALUE = False _DO_NOT_REPLACE_WITH_VALUE = False - +_NO_KEY_TRANSFORMATIONS = None @pytest.mark.parametrize( [ @@ -16,6 +16,7 @@ "field_path", "delete_origin_value", "replace_record", + "key_transformation", "expected_record", ], [ @@ -25,6 +26,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, don't delete origin value", ), @@ -34,6 +36,7 @@ ["field2"], _DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field3": _ANY_VALUE}, id="flatten by dpath, delete origin value", ), @@ -46,6 +49,7 @@ ["field2", "*", "field4"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, { "field1": _ANY_VALUE, "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, @@ -62,6 +66,7 @@ ["field2", "*", "field4"], _DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE}, id="flatten by dpath with *, delete origin value", ), @@ -71,6 +76,7 @@ ["{{ config['field_path'] }}"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath from config, don't delete origin value", ), @@ -80,6 +86,7 @@ ["non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath, don't delete origin value", ), @@ -89,6 +96,7 @@ ["*", "non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath with *, don't delete origin value", ), @@ -98,6 +106,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, don't delete origin value", ), @@ -107,6 +116,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, delete origin value", ), @@ -116,6 +126,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field3": _ANY_VALUE}, id="flatten by dpath, replace with value", ), @@ -125,13 +136,44 @@ ["field2"], _DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field3": _ANY_VALUE}, id="flatten by dpath, delete_origin_value do not affect to replace_record", ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, + "field2_{{ key }}", + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field2_field3": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, add keys transformation", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, + "field2_{{ key }}", + {"field1": _ANY_VALUE, "field2_field3": _ANY_VALUE}, + id="flatten by dpath, delete origin value, add keys transformation", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + "field2_{{ key }}", + {"field2_field3": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys transformation", + ), ], ) def test_dpath_flatten_lists( - input_record, config, field_path, delete_origin_value, replace_record, expected_record + input_record, config, field_path, delete_origin_value, replace_record, key_transformation, expected_record ): flattener = DpathFlattenFields( field_path=field_path, @@ -139,6 +181,7 @@ def test_dpath_flatten_lists( config=config, delete_origin_value=delete_origin_value, replace_record=replace_record, + key_transformation=key_transformation, ) flattener.transform(input_record) assert input_record == expected_record From 58a64fe5fbceecc6522a10a9383bba0583cf6e44 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 10 Apr 2025 18:26:26 +0300 Subject: [PATCH 2/7] mypy and format fix --- .../transformations/dpath_flatten_fields.py | 13 +++++++------ .../transformations/test_dpath_flatten_fields.py | 9 ++++++++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 5ea9ba416..62204b7a2 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -25,18 +25,18 @@ class DpathFlattenFields(RecordTransformation): parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False replace_record: bool = False - key_transformation: Union[InterpolatedString, str] = None + key_transformation: Union[InterpolatedString, str, None] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters self._field_path = [ - InterpolatedString.create(path, parameters=parameters) for path in self.field_path + InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path ] for path_index in range(len(self.field_path)): if isinstance(self.field_path[path_index], str): self._field_path[path_index] = InterpolatedString.create( - self.field_path[path_index], parameters=parameters + self.field_path[path_index], parameters=self._parameters ) - self.parameters = parameters def transform( self, @@ -53,11 +53,12 @@ def transform( extracted = dpath.get(record, path, default=[]) if isinstance(extracted, dict): - if self.key_transformation: updated_extracted = {} for key, value in extracted.items(): - updated_key = InterpolatedString.create(self.key_transformation, parameters=self.parameters).eval(key=key, config=self.config) + updated_key = InterpolatedString.create( + self.key_transformation, parameters=self._parameters + ).eval(key=key, config=self.config) updated_extracted[updated_key] = value extracted = updated_extracted diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 2ae535d45..23fcdf1df 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -9,6 +9,7 @@ _DO_NOT_REPLACE_WITH_VALUE = False _NO_KEY_TRANSFORMATIONS = None + @pytest.mark.parametrize( [ "input_record", @@ -173,7 +174,13 @@ ], ) def test_dpath_flatten_lists( - input_record, config, field_path, delete_origin_value, replace_record, key_transformation, expected_record + input_record, + config, + field_path, + delete_origin_value, + replace_record, + key_transformation, + expected_record, ): flattener = DpathFlattenFields( field_path=field_path, From b94c05acd4e1f2007ce2f2a197218d5dd7f29890 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 11 Apr 2025 15:36:35 +0300 Subject: [PATCH 3/7] refactored KeyTransformation: added prefix and suffix --- .../declarative_component_schema.yaml | 32 +++++++++- .../models/declarative_component_schema.py | 32 ++++++++-- .../parsers/model_to_component_factory.py | 12 +++- .../transformations/dpath_flatten_fields.py | 44 ++++++++++--- .../test_dpath_flatten_fields.py | 63 ++++++++++++++++--- 5 files changed, 158 insertions(+), 25 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 38c4d46f1..adfaf4af4 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2307,6 +2307,33 @@ definitions: $parameters: type: object additionalProperties: true + KeyTransformation: + title: Transformation to apply for extracted object keys by Dpath Flatten Fields + type: object + required: + - type + properties: + type: + type: string + enum: [ KeyTransformation ] + prefix: + title: Key Prefix + description: Prefix to add for object keys. If not provided original keys remain unchanged. + type: string + examples: + - flattened_ + suffix: + title: Key Suffix + description: Suffix to add for object keys. If not provided original keys remain unchanged. + type: string + examples: + - _flattened + custom: + title: Custom Transformation for keys + description: Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged. + type: string + examples: + - flattened_{{ key }} DpathFlattenFields: title: Dpath Flatten Fields description: A transformation that flatten field values to the to top of the record. @@ -2338,9 +2365,8 @@ definitions: key_transformation: title: Key transformation description: Transformation for object keys. If not provided, original key will be used. - type: string - examples: - - "flattened_{{ key }}" + type: object + "$ref": "#/definitions/KeyTransformation" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a8b5daa1c..a11244d6a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -879,6 +879,33 @@ class FlattenFields(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeyTransformation(BaseModel): + prefix: Optional[Union[str, None]] = Field( + None, + description="Prefix to add for object keys. If not provided original keys remain unchanged.", + examples=[ + "flattened_", + ], + title="Key Prefix", + ) + suffix: Optional[Union[str, None]] = Field( + None, + description="Suffix to add for object keys. If not provided original keys remain unchanged.", + examples=[ + "_flattened", + ], + title="Key Suffix", + ) + custom: Optional[Union[str, None]] = Field( + None, + description="Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.", + examples=[ + "flattened_{{ key }}", + ], + title="Custom Transformation for keys", + ) + + class DpathFlattenFields(BaseModel): type: Literal["DpathFlattenFields"] field_path: List[str] = Field( @@ -897,12 +924,9 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) - key_transformation: Optional[str] = Field( + key_transformation: Optional[Union[KeyTransformation, None]] = Field( None, description="Transformation for object keys. If not provided, original key will be used.", - examples=[ - "flattened_{{ key }}", - ], title="Key transformation", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a5e48bc83..b0d6acf58 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -498,6 +498,7 @@ from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( DpathFlattenFields, + KeyTransformation, ) from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, @@ -790,6 +791,15 @@ def create_dpath_flatten_fields( self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any ) -> DpathFlattenFields: model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + key_transformation = ( + KeyTransformation( + prefix=model.key_transformation.prefix, + suffix=model.key_transformation.suffix, + custom=model.key_transformation.custom, + ) + if model.key_transformation is not None + else None + ) return DpathFlattenFields( config=config, field_path=model_field_path, @@ -797,7 +807,7 @@ def create_dpath_flatten_fields( if model.delete_origin_value is not None else False, replace_record=model.replace_record if model.replace_record is not None else False, - key_transformation=model.key_transformation, + key_transformation=key_transformation, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 62204b7a2..43c1cd1bc 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -8,6 +8,13 @@ from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +@dataclass(frozen=True) +class KeyTransformation: + prefix: Union[InterpolatedString, str, None] = None + suffix: Union[InterpolatedString, str, None] = None + custom: Union[InterpolatedString, str, None] = None + + @dataclass class DpathFlattenFields(RecordTransformation): """ @@ -16,7 +23,7 @@ class DpathFlattenFields(RecordTransformation): field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False. - key_transformation: string = None how to transform extracted object keys + key_transformation: KeyTransformation = None how to transform extracted object keys """ @@ -25,7 +32,7 @@ class DpathFlattenFields(RecordTransformation): parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False replace_record: bool = False - key_transformation: Union[InterpolatedString, str, None] = None + key_transformation: Union[KeyTransformation, None] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -38,6 +45,30 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.field_path[path_index], parameters=self._parameters ) + def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: + if self.key_transformation: + if self.key_transformation.prefix: + prefix = InterpolatedString.create( + self.key_transformation.prefix, parameters=self._parameters + ).eval(config=self.config) + extracted = {f"{prefix}{key}": value for key, value in extracted.items()} + + if self.key_transformation.suffix: + suffix = InterpolatedString.create( + self.key_transformation.suffix, parameters=self._parameters + ).eval(config=self.config) + extracted = {f"{key}{suffix}": value for key, value in extracted.items()} + + if self.key_transformation.custom: + updated_extracted = {} + for key, value in extracted.items(): + updated_key = InterpolatedString.create( + self.key_transformation.custom, parameters=self._parameters + ).eval(key=key, config=self.config) + updated_extracted[updated_key] = value + extracted = updated_extracted + return extracted + def transform( self, record: Dict[str, Any], @@ -53,14 +84,7 @@ def transform( extracted = dpath.get(record, path, default=[]) if isinstance(extracted, dict): - if self.key_transformation: - updated_extracted = {} - for key, value in extracted.items(): - updated_key = InterpolatedString.create( - self.key_transformation, parameters=self._parameters - ).eval(key=key, config=self.config) - updated_extracted[updated_key] = value - extracted = updated_extracted + extracted = self._apply_key_transformation(extracted) if self.replace_record and extracted: dpath.delete(record, "**") diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 23fcdf1df..e4df70820 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -1,12 +1,18 @@ import pytest -from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( + DpathFlattenFields, + KeyTransformation, +) _ANY_VALUE = -1 _DELETE_ORIGIN_VALUE = True _REPLACE_WITH_VALUE = True _DO_NOT_DELETE_ORIGIN_VALUE = False _DO_NOT_REPLACE_WITH_VALUE = False +_NO_KEY_PREFIX = None +_NO_KEY_SUFFIX = None +_NO_CUSTOM_TRANSFORMATION = None _NO_KEY_TRANSFORMATIONS = None @@ -147,9 +153,9 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, - "field2_{{ key }}", + (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field2_field3": _ANY_VALUE}, - id="flatten by dpath, not delete origin value, add keys transformation", + id="flatten by dpath, not delete origin value, add keys custom transformation", ), pytest.param( {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, @@ -157,9 +163,9 @@ ["field2"], _DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, - "field2_{{ key }}", + (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), {"field1": _ANY_VALUE, "field2_field3": _ANY_VALUE}, - id="flatten by dpath, delete origin value, add keys transformation", + id="flatten by dpath, delete origin value, add keys custom transformation", ), pytest.param( {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, @@ -167,9 +173,49 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, - "field2_{{ key }}", + (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), {"field2_field3": _ANY_VALUE}, - id="flatten by dpath, not delete origin value, replace record, add keys transformation", + id="flatten by dpath, not delete origin value, replace record, add keys custom transformation", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + ("prefix_", _NO_KEY_SUFFIX, _NO_CUSTOM_TRANSFORMATION), + {"prefix_field3": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys prefix", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + (_NO_KEY_PREFIX, "_suffix", _NO_CUSTOM_TRANSFORMATION), + {"field3_suffix": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys suffix", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + ("prefix_", "_suffix", _NO_CUSTOM_TRANSFORMATION), + {"prefix_field3_suffix": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + ("prefix_", "_suffix", "{{ key|upper }}"), + {"PREFIX_FIELD3_SUFFIX": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix with custom to upper case", ), ], ) @@ -182,6 +228,9 @@ def test_dpath_flatten_lists( key_transformation, expected_record, ): + if key_transformation: + key_transformation = KeyTransformation(*key_transformation) + flattener = DpathFlattenFields( field_path=field_path, parameters={}, From a9c1eb80a31343fcacb55a04388fe6a1e5e18bed Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 14 Apr 2025 17:03:09 +0300 Subject: [PATCH 4/7] removed custom option in KeyTransformation --- .../declarative_component_schema.yaml | 6 --- .../models/declarative_component_schema.py | 8 --- .../parsers/model_to_component_factory.py | 2 +- .../transformations/dpath_flatten_fields.py | 32 +++++------- .../test_dpath_flatten_fields.py | 49 ++----------------- 5 files changed, 17 insertions(+), 80 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index adfaf4af4..2d36004a3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2328,12 +2328,6 @@ definitions: type: string examples: - _flattened - custom: - title: Custom Transformation for keys - description: Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged. - type: string - examples: - - flattened_{{ key }} DpathFlattenFields: title: Dpath Flatten Fields description: A transformation that flatten field values to the to top of the record. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a11244d6a..7f56ab92d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -896,14 +896,6 @@ class KeyTransformation(BaseModel): ], title="Key Suffix", ) - custom: Optional[Union[str, None]] = Field( - None, - description="Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.", - examples=[ - "flattened_{{ key }}", - ], - title="Custom Transformation for keys", - ) class DpathFlattenFields(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b0d6acf58..275d6de64 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -795,7 +795,7 @@ def create_dpath_flatten_fields( KeyTransformation( prefix=model.key_transformation.prefix, suffix=model.key_transformation.suffix, - custom=model.key_transformation.custom, + parameters=model.parameters or {}, ) if model.key_transformation is not None else None diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 43c1cd1bc..3251e855f 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -8,11 +8,15 @@ from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -@dataclass(frozen=True) +@dataclass class KeyTransformation: + parameters: InitVar[Mapping[str, Any]] prefix: Union[InterpolatedString, str, None] = None suffix: Union[InterpolatedString, str, None] = None - custom: Union[InterpolatedString, str, None] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.prefix = InterpolatedString.create(self.prefix, parameters=parameters) + self.suffix = InterpolatedString.create(self.suffix, parameters=parameters) @dataclass @@ -32,7 +36,7 @@ class DpathFlattenFields(RecordTransformation): parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False replace_record: bool = False - key_transformation: Union[KeyTransformation, None] = None + key_transformation: Optional[KeyTransformation] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -48,25 +52,13 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: if self.key_transformation: if self.key_transformation.prefix: - prefix = InterpolatedString.create( - self.key_transformation.prefix, parameters=self._parameters - ).eval(config=self.config) - extracted = {f"{prefix}{key}": value for key, value in extracted.items()} + if prefix := self.key_transformation.prefix.eval(config=self.config): + extracted = {f"{prefix}{key}": value for key, value in extracted.items()} if self.key_transformation.suffix: - suffix = InterpolatedString.create( - self.key_transformation.suffix, parameters=self._parameters - ).eval(config=self.config) - extracted = {f"{key}{suffix}": value for key, value in extracted.items()} - - if self.key_transformation.custom: - updated_extracted = {} - for key, value in extracted.items(): - updated_key = InterpolatedString.create( - self.key_transformation.custom, parameters=self._parameters - ).eval(key=key, config=self.config) - updated_extracted[updated_key] = value - extracted = updated_extracted + if suffix := self.key_transformation.suffix.eval(config=self.config): + extracted = {f"{key}{suffix}": value for key, value in extracted.items()} + return extracted def transform( diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index e4df70820..eab162f87 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -12,7 +12,6 @@ _DO_NOT_REPLACE_WITH_VALUE = False _NO_KEY_PREFIX = None _NO_KEY_SUFFIX = None -_NO_CUSTOM_TRANSFORMATION = None _NO_KEY_TRANSFORMATIONS = None @@ -147,43 +146,13 @@ {"field3": _ANY_VALUE}, id="flatten by dpath, delete_origin_value do not affect to replace_record", ), - pytest.param( - {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, - {}, - ["field2"], - _DO_NOT_DELETE_ORIGIN_VALUE, - _DO_NOT_REPLACE_WITH_VALUE, - (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), - {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field2_field3": _ANY_VALUE}, - id="flatten by dpath, not delete origin value, add keys custom transformation", - ), - pytest.param( - {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, - {}, - ["field2"], - _DELETE_ORIGIN_VALUE, - _DO_NOT_REPLACE_WITH_VALUE, - (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), - {"field1": _ANY_VALUE, "field2_field3": _ANY_VALUE}, - id="flatten by dpath, delete origin value, add keys custom transformation", - ), pytest.param( {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, {}, ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, - (_NO_KEY_PREFIX, _NO_KEY_SUFFIX, "field2_{{ key }}"), - {"field2_field3": _ANY_VALUE}, - id="flatten by dpath, not delete origin value, replace record, add keys custom transformation", - ), - pytest.param( - {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, - {}, - ["field2"], - _DO_NOT_DELETE_ORIGIN_VALUE, - _REPLACE_WITH_VALUE, - ("prefix_", _NO_KEY_SUFFIX, _NO_CUSTOM_TRANSFORMATION), + ("prefix_", _NO_KEY_SUFFIX), {"prefix_field3": _ANY_VALUE}, id="flatten by dpath, not delete origin value, replace record, add keys prefix", ), @@ -193,7 +162,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, - (_NO_KEY_PREFIX, "_suffix", _NO_CUSTOM_TRANSFORMATION), + (_NO_KEY_PREFIX, "_suffix"), {"field3_suffix": _ANY_VALUE}, id="flatten by dpath, not delete origin value, replace record, add keys suffix", ), @@ -203,20 +172,10 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, - ("prefix_", "_suffix", _NO_CUSTOM_TRANSFORMATION), + ("prefix_", "_suffix"), {"prefix_field3_suffix": _ANY_VALUE}, id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix", ), - pytest.param( - {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, - {}, - ["field2"], - _DO_NOT_DELETE_ORIGIN_VALUE, - _REPLACE_WITH_VALUE, - ("prefix_", "_suffix", "{{ key|upper }}"), - {"PREFIX_FIELD3_SUFFIX": _ANY_VALUE}, - id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix with custom to upper case", - ), ], ) def test_dpath_flatten_lists( @@ -229,7 +188,7 @@ def test_dpath_flatten_lists( expected_record, ): if key_transformation: - key_transformation = KeyTransformation(*key_transformation) + key_transformation = KeyTransformation({}, *key_transformation) flattener = DpathFlattenFields( field_path=field_path, From 19fdc2638869d1c331076b7c05f25149532a2057 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 14 Apr 2025 17:22:01 +0300 Subject: [PATCH 5/7] mypy fix --- .../parsers/model_to_component_factory.py | 1 + .../transformations/dpath_flatten_fields.py | 13 +++++++------ .../transformations/test_dpath_flatten_fields.py | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 275d6de64..fc6ffc45d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -793,6 +793,7 @@ def create_dpath_flatten_fields( model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] key_transformation = ( KeyTransformation( + config=config, prefix=model.key_transformation.prefix, suffix=model.key_transformation.suffix, parameters=model.parameters or {}, diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 3251e855f..743476312 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -10,13 +10,16 @@ @dataclass class KeyTransformation: + config: Config parameters: InitVar[Mapping[str, Any]] prefix: Union[InterpolatedString, str, None] = None suffix: Union[InterpolatedString, str, None] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self.prefix = InterpolatedString.create(self.prefix, parameters=parameters) - self.suffix = InterpolatedString.create(self.suffix, parameters=parameters) + if self.prefix is not None: + self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(self.config) + if self.suffix is not None: + self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(self.config) @dataclass @@ -52,12 +55,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: if self.key_transformation: if self.key_transformation.prefix: - if prefix := self.key_transformation.prefix.eval(config=self.config): - extracted = {f"{prefix}{key}": value for key, value in extracted.items()} + extracted = {f"{self.key_transformation.prefix}{key}": value for key, value in extracted.items()} if self.key_transformation.suffix: - if suffix := self.key_transformation.suffix.eval(config=self.config): - extracted = {f"{key}{suffix}": value for key, value in extracted.items()} + extracted = {f"{key}{self.key_transformation.suffix}": value for key, value in extracted.items()} return extracted diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index eab162f87..f86fa7170 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -188,7 +188,7 @@ def test_dpath_flatten_lists( expected_record, ): if key_transformation: - key_transformation = KeyTransformation({}, *key_transformation) + key_transformation = KeyTransformation(config, {}, *key_transformation) flattener = DpathFlattenFields( field_path=field_path, From 142322f212bcdd1c8798a3f730fe4370a98f61a7 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 14 Apr 2025 14:58:05 +0000 Subject: [PATCH 6/7] Auto-fix lint and format issues --- .../transformations/dpath_flatten_fields.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 743476312..b075cf95c 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -17,9 +17,13 @@ class KeyTransformation: def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.prefix is not None: - self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(self.config) + self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval( + self.config + ) if self.suffix is not None: - self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(self.config) + self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval( + self.config + ) @dataclass @@ -55,10 +59,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: if self.key_transformation: if self.key_transformation.prefix: - extracted = {f"{self.key_transformation.prefix}{key}": value for key, value in extracted.items()} + extracted = { + f"{self.key_transformation.prefix}{key}": value + for key, value in extracted.items() + } if self.key_transformation.suffix: - extracted = {f"{key}{self.key_transformation.suffix}": value for key, value in extracted.items()} + extracted = { + f"{key}{self.key_transformation.suffix}": value + for key, value in extracted.items() + } return extracted From ce7712b970d98dcd218a78fdc2779276b71f204f Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Tue, 15 Apr 2025 12:45:48 +0300 Subject: [PATCH 7/7] updated KeyTransformation --- .../declarative/transformations/dpath_flatten_fields.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index b075cf95c..e6b1a2f60 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -12,8 +12,8 @@ class KeyTransformation: config: Config parameters: InitVar[Mapping[str, Any]] - prefix: Union[InterpolatedString, str, None] = None - suffix: Union[InterpolatedString, str, None] = None + prefix: Optional[str] = None + suffix: Optional[str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.prefix is not None: