Skip to content

Commit 38b7362

Browse files
committed
updated the structure to address the versioning and the order of the migrations registered, add the metadata.applied_migrations object to be able to trace the applied MigrationTrace
1 parent 2f168ba commit 38b7362

13 files changed

+341
-163
lines changed

airbyte_cdk/manifest_migrations/README.md

+40-33
Original file line numberDiff line numberDiff line change
@@ -6,44 +6,53 @@ This directory contains the logic and registry for manifest migrations in the Ai
66

77
1. **Create a Migration File:**
88
- Add a new Python file in the `migrations/` subdirectory.
9-
- Name the file using the pattern: `<description>_v<major>_<minor>_<patch>__<order>.py`.
10-
- Example: `http_requester_url_base_to_url_v6_45_2__0.py`
11-
- The `<order>` integer is used to determine the order of migrations for the same version.
9+
- Name the file using the pattern: `<description_of_the_migration>.py`.
10+
- Example: `http_requester_url_base_to_url.py`
11+
- The filename should be unique and descriptive.
1212

1313
2. **Define the Migration Class:**
1414
- The migration class must inherit from `ManifestMigration`.
15-
- Name the class using the pattern: `V_<major>_<minor>_<patch>_<Description>`.
16-
- Example: `V_6_45_2_HttpRequesterUrlBaseToUrl`
15+
- Name the class using a descriptive name (e.g., `HttpRequesterUrlBaseToUrl`).
1716
- Implement the following methods:
18-
- `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest.
19-
- `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place.
20-
21-
3. **Migration Versioning:**
22-
- The migration version is extracted from the class name and used to determine applicability.
23-
- Only manifests with a version less than or equal to the migration version will be migrated.
24-
25-
4. **Component Type:**
26-
- Use the `TYPE_TAG` constant to check the component type in your migration logic.
27-
28-
5. **Examples:**
29-
- See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations.
30-
31-
## Migration Registry
32-
33-
- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`.
34-
- Migrations are applied in order, determined by the `<order>` suffix in the filename.
35-
36-
## Testing
37-
38-
- Ensure your migration is covered by unit tests.
39-
- Tests should verify both `should_migrate` and `migrate` behaviors.
17+
- `should_migrate(self, manifest: ManifestType) -> bool`
18+
- `migrate(self, manifest: ManifestType) -> None`
19+
- `validate(self, manifest: ManifestType) -> bool`
20+
21+
3. **Register the Migration:**
22+
- Open `migrations/registry.yaml`.
23+
- Add an entry under the appropriate version, or create a new version section if needed.
24+
- Each migration entry should include:
25+
- `name`: The filename (without `.py`)
26+
- `order`: The order in which this migration should be applied for the version
27+
- `description`: A short description of the migration
28+
29+
Example:
30+
```yaml
31+
manifest_migrations:
32+
- version: 6.45.2
33+
migrations:
34+
- name: http_requester_url_base_to_url
35+
order: 1
36+
description: |
37+
This migration updates the `url_base` field in the `HttpRequester` component spec to `url`.
38+
```
39+
40+
4. **Testing:**
41+
- Ensure your migration is covered by unit tests.
42+
- Tests should verify both `should_migrate`, `migrate`, and `validate` behaviors.
43+
44+
## Migration Discovery
45+
46+
- Migrations are discovered and registered automatically based on the entries in `migrations/registry.yaml`.
47+
- Do not modify the migration registry in code manually.
48+
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.
4049

4150
## Example Migration Skeleton
4251

4352
```python
4453
from airbyte_cdk.manifest_migrations.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType
4554
46-
class V_1_2_3_Example(ManifestMigration):
55+
class ExampleMigration(ManifestMigration):
4756
component_type = "ExampleComponent"
4857
original_key = "old_key"
4958
replacement_key = "new_key"
@@ -54,12 +63,10 @@ class V_1_2_3_Example(ManifestMigration):
5463
def migrate(self, manifest: ManifestType) -> None:
5564
manifest[self.replacement_key] = manifest[self.original_key]
5665
manifest.pop(self.original_key, None)
57-
```
5866
59-
## Additional Notes
60-
61-
- Do not modify the migration registry manually; it will pick up all valid migration classes automatically.
62-
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.
67+
def validate(self, manifest: ManifestType) -> bool:
68+
return self.replacement_key in manifest and self.original_key not in manifest
69+
```
6370

6471
---
6572

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#

airbyte_cdk/manifest_migrations/exceptions.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

55

airbyte_cdk/manifest_migrations/manifest_migration.py

+41-34
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,57 @@
1-
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
24

3-
import re
4-
from abc import abstractmethod
5-
from typing import Any, Dict
65

7-
from packaging.version import Version
6+
from abc import ABC, abstractmethod
7+
from dataclasses import asdict, dataclass
8+
from typing import Any, Dict
89

910
ManifestType = Dict[str, Any]
1011

1112

1213
TYPE_TAG = "type"
1314

1415
NON_MIGRATABLE_TYPES = [
16+
# more info here: https://github.com/airbytehq/airbyte-internal-issues/issues/12423
1517
"DynamicDeclarativeStream",
1618
]
1719

1820

19-
class ManifestMigration:
21+
@dataclass
22+
class MigrationTrace:
23+
"""
24+
This class represents a migration that has been applied to the manifest.
25+
It contains information about the migration, including the version it was applied from,
26+
the version it was applied to, and the time it was applied.
27+
"""
28+
29+
from_version: str
30+
to_version: str
31+
migration: str
32+
migrated_at: str
33+
34+
def as_dict(self) -> dict:
35+
return asdict(self)
36+
37+
38+
class ManifestMigration(ABC):
39+
"""
40+
Base class for manifest migrations.
41+
This class provides a framework for migrating manifest components.
42+
It defines the structure for migration classes, including methods for checking if a migration is needed,
43+
performing the migration, and validating the migration.
44+
"""
45+
46+
def __init__(self) -> None:
47+
self.is_migrated: bool = False
48+
2049
@abstractmethod
2150
def should_migrate(self, manifest: ManifestType) -> bool:
2251
"""
2352
Check if the manifest should be migrated.
2453
2554
:param manifest: The manifest to potentially migrate
26-
:param kwargs: Additional arguments for migration
2755
2856
:return: true if the manifest is of the expected format and should be migrated. False otherwise.
2957
"""
@@ -34,17 +62,15 @@ def migrate(self, manifest: ManifestType) -> None:
3462
Migrate the manifest. Assumes should_migrate(manifest) returned True.
3563
3664
:param manifest: The manifest to migrate
37-
:param kwargs: Additional arguments for migration
3865
"""
3966

40-
@property
41-
def migration_version(self) -> Version:
67+
@abstractmethod
68+
def validate(self, manifest: ManifestType) -> bool:
4269
"""
43-
Get the migration version.
70+
Validate the manifest to ensure the migration was successfully applied.
4471
45-
:return: The migration version as a Version object
72+
:param manifest: The manifest to validate
4673
"""
47-
return Version(self._get_migration_version())
4874

4975
def _is_component(self, obj: Dict[str, Any]) -> bool:
5076
"""
@@ -95,6 +121,8 @@ def _process_manifest(self, obj: Any) -> None:
95121
if self.should_migrate(obj):
96122
# Perform the migration, if needed
97123
self.migrate(obj)
124+
# validate the migration
125+
self.is_migrated = self.validate(obj)
98126

99127
# Process all values in the dictionary
100128
for value in list(obj.values()):
@@ -104,24 +132,3 @@ def _process_manifest(self, obj: Any) -> None:
104132
# Process all items in the list
105133
for item in obj:
106134
self._process_manifest(item)
107-
108-
def _get_migration_version(self) -> str:
109-
"""
110-
Get the migration version from the class name.
111-
The migration version is extracted from the class name using a regular expression.
112-
The expected format is "V_<major>_<minor>_<patch>_<migration_name>".
113-
114-
For example, "V_6_45_2_HttpRequesterPathToUrl" -> "6.45.2"
115-
116-
:return: The migration version as a string in the format "major.minor.patch"
117-
:raises ValueError: If the class name does not match the expected format
118-
"""
119-
120-
class_name = self.__class__.__name__
121-
migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name)
122-
if migration_version:
123-
return migration_version.group(1).replace("_", ".")
124-
else:
125-
raise ValueError(
126-
f"Invalid migration class name, make sure the class name has the version (e.g `V_<major>_<minor>_<patch>_<migration_name>`): {class_name}"
127-
)

airbyte_cdk/manifest_migrations/migration_handler.py

+70-11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5+
56
import copy
67
import logging
8+
from datetime import datetime, timezone
79
from typing import Type
810

911
from packaging.version import Version
@@ -14,11 +16,16 @@
1416
from airbyte_cdk.manifest_migrations.manifest_migration import (
1517
ManifestMigration,
1618
ManifestType,
19+
MigrationTrace,
1720
)
1821
from airbyte_cdk.manifest_migrations.migrations_registry import (
19-
MIGRATIONS,
22+
MANIFEST_MIGRATIONS,
2023
)
2124

25+
METADATA_TAG = "metadata"
26+
MANIFEST_VERSION_TAG = "version"
27+
APPLIED_MIGRATIONS_TAG = "applied_migrations"
28+
2229
LOGGER = logging.getLogger("airbyte.cdk.manifest_migrations")
2330

2431

@@ -30,7 +37,6 @@ class ManifestMigrationHandler:
3037
def __init__(self, manifest: ManifestType) -> None:
3138
self._manifest = manifest
3239
self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest)
33-
self._manifest_version: Version = self._get_manifest_version()
3440

3541
def apply_migrations(self) -> ManifestType:
3642
"""
@@ -45,14 +51,21 @@ def apply_migrations(self) -> ManifestType:
4551
manifest if any migration failed.
4652
"""
4753
try:
48-
for migration_cls in MIGRATIONS:
49-
self._handle_migration(migration_cls)
54+
manifest_version = self._get_manifest_version()
55+
for migration_version, migrations in MANIFEST_MIGRATIONS.items():
56+
for migration_cls in migrations:
57+
self._handle_migration(migration_cls, manifest_version, migration_version)
5058
return self._migrated_manifest
5159
except ManifestMigrationException:
5260
# if any errors occur we return the original resolved manifest
5361
return self._manifest
5462

55-
def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None:
63+
def _handle_migration(
64+
self,
65+
migration_class: Type[ManifestMigration],
66+
manifest_version: str,
67+
migration_version: str,
68+
) -> None:
5669
"""
5770
Handles a single manifest migration by instantiating the migration class and processing the manifest.
5871
@@ -64,21 +77,67 @@ def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None:
6477
"""
6578
try:
6679
migration_instance = migration_class()
67-
# check if the migration is supported for the given manifest version
68-
if self._manifest_version <= migration_instance.migration_version:
80+
if self._version_is_valid_for_migration(manifest_version, migration_version):
6981
migration_instance._process_manifest(self._migrated_manifest)
82+
if migration_instance.is_migrated:
83+
# set the updated manifest version, after migration has been applied
84+
self._set_manifest_version(migration_version)
85+
# set the migration trace
86+
self._set_migration_trace(migration_class, manifest_version, migration_version)
7087
else:
7188
LOGGER.info(
72-
f"Manifest migration: `{migration_class.__name__}` is not supported for the given manifest version `{self._manifest_version}`.",
89+
f"Manifest migration: `{migration_instance.__name__}` is not supported for the given manifest version `{manifest_version}`.",
7390
)
7491
except Exception as e:
7592
raise ManifestMigrationException(str(e)) from e
7693

77-
def _get_manifest_version(self) -> Version:
94+
def _get_manifest_version(self) -> str:
7895
"""
7996
Get the manifest version from the manifest.
8097
8198
:param manifest: The manifest to get the version from
8299
:return: The manifest version
83100
"""
84-
return Version(str(self._migrated_manifest.get("version", "0.0.0")))
101+
return str(self._migrated_manifest.get(MANIFEST_VERSION_TAG, "0.0.0"))
102+
103+
def _version_is_valid_for_migration(
104+
self, manifest_version: str, migration_version: str
105+
) -> bool:
106+
return Version(manifest_version) <= Version(migration_version)
107+
108+
def _set_manifest_version(self, version: str) -> None:
109+
"""
110+
Set the manifest version in the manifest.
111+
112+
:param version: The version to set
113+
"""
114+
self._migrated_manifest[MANIFEST_VERSION_TAG] = version
115+
116+
def _set_migration_trace(
117+
self,
118+
migration_instance: Type[ManifestMigration],
119+
manifest_version: str,
120+
migration_version: str,
121+
) -> None:
122+
"""
123+
Set the migration trace in the manifest.
124+
125+
:param migration_instance: The migration instance to set
126+
:param manifest_version: The manifest version before migration
127+
:param migration_version: The manifest version after migration
128+
"""
129+
130+
if METADATA_TAG not in self._migrated_manifest:
131+
self._migrated_manifest[METADATA_TAG] = {}
132+
if APPLIED_MIGRATIONS_TAG not in self._migrated_manifest[METADATA_TAG]:
133+
self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG] = []
134+
135+
migration_trace = MigrationTrace(
136+
from_version=manifest_version,
137+
to_version=migration_version,
138+
migration=migration_instance.__name__,
139+
migrated_at=datetime.now(tz=timezone.utc).isoformat(),
140+
).as_dict()
141+
142+
if migration_version not in self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG]:
143+
self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG].append(migration_trace)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+

0 commit comments

Comments
 (0)