diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2f09579ba..2d36004a3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2307,6 +2307,27 @@ definitions: $parameters: type: object additionalProperties: true + KeyTransformation: + title: Transformation to apply for extracted object keys by Dpath Flatten Fields + type: object + required: + - type + properties: + type: + type: string + enum: [ KeyTransformation ] + prefix: + title: Key Prefix + description: Prefix to add for object keys. If not provided original keys remain unchanged. + type: string + examples: + - flattened_ + suffix: + title: Key Suffix + description: Suffix to add for object keys. If not provided original keys remain unchanged. + type: string + examples: + - _flattened DpathFlattenFields: title: Dpath Flatten Fields description: A transformation that flatten field values to the to top of the record. @@ -2335,6 +2356,11 @@ definitions: title: Replace Origin Record description: Whether to replace the origin record or not. Default is False. type: boolean + key_transformation: + title: Key transformation + description: Transformation for object keys. If not provided, original key will be used. + type: object + "$ref": "#/definitions/KeyTransformation" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3566abef4..7f56ab92d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -879,6 +879,25 @@ class FlattenFields(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeyTransformation(BaseModel): + prefix: Optional[Union[str, None]] = Field( + None, + description="Prefix to add for object keys. If not provided original keys remain unchanged.", + examples=[ + "flattened_", + ], + title="Key Prefix", + ) + suffix: Optional[Union[str, None]] = Field( + None, + description="Suffix to add for object keys. If not provided original keys remain unchanged.", + examples=[ + "_flattened", + ], + title="Key Suffix", + ) + + class DpathFlattenFields(BaseModel): type: Literal["DpathFlattenFields"] field_path: List[str] = Field( @@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) + key_transformation: Optional[Union[KeyTransformation, None]] = Field( + None, + description="Transformation for object keys. If not provided, original key will be used.", + title="Key transformation", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 25840f06f..fc6ffc45d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -498,6 +498,7 @@ from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( DpathFlattenFields, + KeyTransformation, ) from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, @@ -790,6 +791,16 @@ def create_dpath_flatten_fields( self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any ) -> DpathFlattenFields: model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + key_transformation = ( + KeyTransformation( + config=config, + prefix=model.key_transformation.prefix, + suffix=model.key_transformation.suffix, + parameters=model.parameters or {}, + ) + if model.key_transformation is not None + else None + ) return DpathFlattenFields( config=config, field_path=model_field_path, @@ -797,6 +808,7 @@ def create_dpath_flatten_fields( if model.delete_origin_value is not None else False, replace_record=model.replace_record if model.replace_record is not None else False, + key_transformation=key_transformation, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 1486f7667..e6b1a2f60 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -8,6 +8,24 @@ from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +@dataclass +class KeyTransformation: + config: Config + parameters: InitVar[Mapping[str, Any]] + prefix: Optional[str] = None + suffix: Optional[str] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if self.prefix is not None: + self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval( + self.config + ) + if self.suffix is not None: + self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval( + self.config + ) + + @dataclass class DpathFlattenFields(RecordTransformation): """ @@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation): field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False. + key_transformation: KeyTransformation = None how to transform extracted object keys """ @@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation): parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False replace_record: bool = False + key_transformation: Optional[KeyTransformation] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters self._field_path = [ - InterpolatedString.create(path, parameters=parameters) for path in self.field_path + InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path ] for path_index in range(len(self.field_path)): if isinstance(self.field_path[path_index], str): self._field_path[path_index] = InterpolatedString.create( - self.field_path[path_index], parameters=parameters + self.field_path[path_index], parameters=self._parameters ) + def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: + if self.key_transformation: + if self.key_transformation.prefix: + extracted = { + f"{self.key_transformation.prefix}{key}": value + for key, value in extracted.items() + } + + if self.key_transformation.suffix: + extracted = { + f"{key}{self.key_transformation.suffix}": value + for key, value in extracted.items() + } + + return extracted + def transform( self, record: Dict[str, Any], @@ -50,6 +87,8 @@ def transform( extracted = dpath.get(record, path, default=[]) if isinstance(extracted, dict): + extracted = self._apply_key_transformation(extracted) + if self.replace_record and extracted: dpath.delete(record, "**") record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 06842bc7d..f86fa7170 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -1,12 +1,18 @@ import pytest -from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( + DpathFlattenFields, + KeyTransformation, +) _ANY_VALUE = -1 _DELETE_ORIGIN_VALUE = True _REPLACE_WITH_VALUE = True _DO_NOT_DELETE_ORIGIN_VALUE = False _DO_NOT_REPLACE_WITH_VALUE = False +_NO_KEY_PREFIX = None +_NO_KEY_SUFFIX = None +_NO_KEY_TRANSFORMATIONS = None @pytest.mark.parametrize( @@ -16,6 +22,7 @@ "field_path", "delete_origin_value", "replace_record", + "key_transformation", "expected_record", ], [ @@ -25,6 +32,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, don't delete origin value", ), @@ -34,6 +42,7 @@ ["field2"], _DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field3": _ANY_VALUE}, id="flatten by dpath, delete origin value", ), @@ -46,6 +55,7 @@ ["field2", "*", "field4"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, { "field1": _ANY_VALUE, "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, @@ -62,6 +72,7 @@ ["field2", "*", "field4"], _DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE}, id="flatten by dpath with *, delete origin value", ), @@ -71,6 +82,7 @@ ["{{ config['field_path'] }}"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath from config, don't delete origin value", ), @@ -80,6 +92,7 @@ ["non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath, don't delete origin value", ), @@ -89,6 +102,7 @@ ["*", "non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath with *, don't delete origin value", ), @@ -98,6 +112,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, don't delete origin value", ), @@ -107,6 +122,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _DO_NOT_REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, delete origin value", ), @@ -116,6 +132,7 @@ ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field3": _ANY_VALUE}, id="flatten by dpath, replace with value", ), @@ -125,20 +142,61 @@ ["field2"], _DELETE_ORIGIN_VALUE, _REPLACE_WITH_VALUE, + _NO_KEY_TRANSFORMATIONS, {"field3": _ANY_VALUE}, id="flatten by dpath, delete_origin_value do not affect to replace_record", ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + ("prefix_", _NO_KEY_SUFFIX), + {"prefix_field3": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys prefix", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + (_NO_KEY_PREFIX, "_suffix"), + {"field3_suffix": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys suffix", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + ("prefix_", "_suffix"), + {"prefix_field3_suffix": _ANY_VALUE}, + id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix", + ), ], ) def test_dpath_flatten_lists( - input_record, config, field_path, delete_origin_value, replace_record, expected_record + input_record, + config, + field_path, + delete_origin_value, + replace_record, + key_transformation, + expected_record, ): + if key_transformation: + key_transformation = KeyTransformation(config, {}, *key_transformation) + flattener = DpathFlattenFields( field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value, replace_record=replace_record, + key_transformation=key_transformation, ) flattener.transform(input_record) assert input_record == expected_record