Skip to content

Commit 3237334

Browse files
authored
feat: update ManifestDeclarativeSource to invoke declarative config migrations/transformations/validations (#561)
1 parent 4b73b46 commit 3237334

File tree

7 files changed

+419
-149
lines changed

7 files changed

+419
-149
lines changed

airbyte_cdk/cli/source_declarative_manifest/_run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(
5858
catalog: ConfiguredAirbyteCatalog | None,
5959
config: MutableMapping[str, Any] | None,
6060
state: TState,
61+
config_path: str | None = None,
6162
**kwargs: Any,
6263
) -> None:
6364
"""
@@ -76,6 +77,7 @@ def __init__(
7677
config=config,
7778
state=state, # type: ignore [arg-type]
7879
path_to_yaml="manifest.yaml",
80+
config_path=config_path,
7981
)
8082

8183

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(
7474
debug: bool = False,
7575
emit_connector_builder_messages: bool = False,
7676
component_factory: Optional[ModelToComponentFactory] = None,
77+
config_path: Optional[str] = None,
7778
**kwargs: Any,
7879
) -> None:
7980
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
@@ -96,6 +97,7 @@ def __init__(
9697
debug=debug,
9798
emit_connector_builder_messages=emit_connector_builder_messages,
9899
component_factory=component_factory,
100+
config_path=config_path,
99101
)
100102

101103
concurrency_level_from_manifest = self._source_config.get("concurrency_level")

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

55
import json
@@ -8,13 +8,15 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
1212

13+
import orjson
1314
import yaml
1415
from jsonschema.exceptions import ValidationError
1516
from jsonschema.validators import validate
1617
from packaging.version import InvalidVersion, Version
1718

19+
from airbyte_cdk.config_observation import create_connector_config_control_message
1820
from airbyte_cdk.connector_builder.models import (
1921
LogMessage as ConnectorBuilderLogMessage,
2022
)
@@ -29,6 +31,7 @@
2931
ConnectorSpecification,
3032
FailureType,
3133
)
34+
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
3235
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
3336
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
3437
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
@@ -57,9 +60,10 @@
5760
ModelToComponentFactory,
5861
)
5962
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
63+
from airbyte_cdk.sources.declarative.spec.spec import Spec
6064
from airbyte_cdk.sources.message import MessageRepository
6165
from airbyte_cdk.sources.streams.core import Stream
62-
from airbyte_cdk.sources.types import ConnectionDefinition
66+
from airbyte_cdk.sources.types import Config, ConnectionDefinition
6367
from airbyte_cdk.sources.utils.slice_logger import (
6468
AlwaysLogSliceLogger,
6569
DebugSliceLogger,
@@ -99,6 +103,7 @@ def __init__(
99103
component_factory: Optional[ModelToComponentFactory] = None,
100104
migrate_manifest: Optional[bool] = False,
101105
normalize_manifest: Optional[bool] = False,
106+
config_path: Optional[str] = None,
102107
) -> None:
103108
"""
104109
Args:
@@ -108,6 +113,7 @@ def __init__(
108113
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
109114
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
110115
normalize_manifest: Optional flag to indicate if the manifest should be normalized.
116+
config_path: Optional path to the config file.
111117
"""
112118
self.logger = logging.getLogger(f"airbyte.{self.name}")
113119
self._should_normalize = normalize_manifest
@@ -130,7 +136,6 @@ def __init__(
130136
self._slice_logger: SliceLogger = (
131137
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
132138
)
133-
self._config = config or {}
134139

135140
# resolve all components in the manifest
136141
self._source_config = self._pre_process_manifest(dict(source_config))
@@ -139,6 +144,12 @@ def __init__(
139144
# apply additional post-processing to the manifest
140145
self._post_process_manifest()
141146

147+
spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
148+
self._spec_component: Optional[Spec] = (
149+
self._constructor.create_component(SpecModel, spec, dict()) if spec else None
150+
)
151+
self._config = self._migrate_and_transform_config(config_path, config) or {}
152+
142153
@property
143154
def resolved_manifest(self) -> Mapping[str, Any]:
144155
"""
@@ -199,6 +210,30 @@ def _normalize_manifest(self) -> None:
199210
normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
200211
self._source_config = normalizer.normalize()
201212

213+
def _migrate_and_transform_config(
214+
self,
215+
config_path: Optional[str],
216+
config: Optional[Config],
217+
) -> Optional[Config]:
218+
if not config:
219+
return None
220+
if not self._spec_component:
221+
return config
222+
mutable_config = dict(config)
223+
self._spec_component.migrate_config(mutable_config)
224+
if mutable_config != config:
225+
if config_path:
226+
with open(config_path, "w") as f:
227+
json.dump(mutable_config, f)
228+
self.message_repository.emit_message(
229+
create_connector_config_control_message(mutable_config)
230+
)
231+
# We have no mechanism for consuming the queue, so we print the messages to stdout
232+
for message in self.message_repository.consume_queue():
233+
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
234+
self._spec_component.transform_config(mutable_config)
235+
return mutable_config
236+
202237
def _migrate_manifest(self) -> None:
203238
"""
204239
This method is used to migrate the manifest. It should be called after the manifest has been validated.
@@ -255,6 +290,9 @@ def connection_checker(self) -> ConnectionChecker:
255290
)
256291

257292
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
293+
if self._spec_component:
294+
self._spec_component.validate_config(config)
295+
258296
self._emit_manifest_debug_message(
259297
extra_args={
260298
"source_name": self.name,
@@ -355,14 +393,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
355393
}
356394
)
357395

358-
spec = self._source_config.get("spec")
359-
if spec:
360-
if "type" not in spec:
361-
spec["type"] = "Spec"
362-
spec_component = self._constructor.create_component(SpecModel, spec, dict())
363-
return spec_component.generate_spec()
364-
else:
365-
return super().spec(logger)
396+
return (
397+
self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
398+
)
366399

367400
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
368401
self._configure_logger_level(logger)

airbyte_cdk/sources/declarative/spec/spec.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5-
import json
65
from dataclasses import InitVar, dataclass, field
76
from typing import Any, List, Mapping, MutableMapping, Optional
87

9-
import orjson
10-
11-
from airbyte_cdk.config_observation import create_connector_config_control_message
12-
from airbyte_cdk.entrypoint import AirbyteEntrypoint
138
from airbyte_cdk.models import (
149
AdvancedAuth,
1510
ConnectorSpecification,
1611
ConnectorSpecificationSerializer,
1712
)
18-
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
1913
from airbyte_cdk.sources.declarative.models.declarative_component_schema import AuthFlow
2014
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
2115
ConfigTransformation,
2216
)
2317
from airbyte_cdk.sources.declarative.validators.validator import Validator
2418
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
25-
from airbyte_cdk.sources.source import Source
2619

2720

2821
@dataclass
@@ -48,7 +41,6 @@ class Spec:
4841
config_migrations: List[ConfigMigration] = field(default_factory=list)
4942
config_transformations: List[ConfigTransformation] = field(default_factory=list)
5043
config_validations: List[Validator] = field(default_factory=list)
51-
message_repository: MessageRepository = InMemoryMessageRepository()
5244

5345
def generate_spec(self) -> ConnectorSpecification:
5446
"""
@@ -69,34 +61,15 @@ def generate_spec(self) -> ConnectorSpecification:
6961
# We remap these keys to camel case because that's the existing format expected by the rest of the platform
7062
return ConnectorSpecificationSerializer.load(obj)
7163

72-
def migrate_config(
73-
self, args: List[str], source: Source, config: MutableMapping[str, Any]
74-
) -> None:
64+
def migrate_config(self, config: MutableMapping[str, Any]) -> None:
7565
"""
76-
Apply all specified config transformations to the provided config and save the modified config to the given path and emit a control message.
66+
Apply all specified config transformations to the provided config and emit a control message.
7767
78-
:param args: Command line arguments
79-
:param source: Source instance
8068
:param config: The user-provided config to migrate
8169
"""
82-
config_path = AirbyteEntrypoint(source).extract_config(args)
83-
84-
if not config_path:
85-
return
86-
87-
mutable_config = dict(config)
8870
for migration in self.config_migrations:
8971
for transformation in migration.transformations:
90-
transformation.transform(mutable_config)
91-
92-
if mutable_config != config:
93-
with open(config_path, "w") as f:
94-
json.dump(mutable_config, f)
95-
self.message_repository.emit_message(
96-
create_connector_config_control_message(mutable_config)
97-
)
98-
for message in self.message_repository.consume_queue():
99-
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
72+
transformation.transform(config)
10073

10174
def transform_config(self, config: MutableMapping[str, Any]) -> None:
10275
"""
@@ -107,7 +80,7 @@ def transform_config(self, config: MutableMapping[str, Any]) -> None:
10780
for transformation in self.config_transformations:
10881
transformation.transform(config)
10982

110-
def validate_config(self, config: MutableMapping[str, Any]) -> None:
83+
def validate_config(self, config: Mapping[str, Any]) -> None:
11184
"""
11285
Apply all config validations to the provided config.
11386

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def __init__(
2424
catalog: Optional[ConfiguredAirbyteCatalog] = None,
2525
config: Optional[Mapping[str, Any]] = None,
2626
state: Optional[List[AirbyteStateMessage]] = None,
27+
config_path: Optional[str] = None,
2728
) -> None:
2829
"""
2930
:param path_to_yaml: Path to the yaml file describing the source
@@ -36,6 +37,7 @@ def __init__(
3637
config=config or {},
3738
state=state or [],
3839
source_config=source_config,
40+
config_path=config_path,
3941
)
4042

4143
def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:

unit_tests/sources/declarative/spec/test_spec.py

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5-
from unittest.mock import Mock, mock_open
6-
75
import pytest
86

97
from airbyte_cdk.models import (
@@ -161,108 +159,6 @@ def test_spec(spec, expected_connection_specification) -> None:
161159
assert spec.generate_spec() == expected_connection_specification
162160

163161

164-
@pytest.fixture
165-
def migration_mocks(monkeypatch):
166-
mock_message_repository = Mock()
167-
mock_message_repository.consume_queue.return_value = [Mock()]
168-
169-
mock_source = Mock()
170-
mock_entrypoint = Mock()
171-
mock_entrypoint.extract_config.return_value = "/fake/config/path"
172-
monkeypatch.setattr(
173-
"airbyte_cdk.sources.declarative.spec.spec.AirbyteEntrypoint", lambda _: mock_entrypoint
174-
)
175-
176-
_mock_open = mock_open()
177-
mock_json_dump = Mock()
178-
mock_print = Mock()
179-
mock_serializer_dump = Mock()
180-
181-
mock_decoded_bytes = Mock()
182-
mock_decoded_bytes.decode.return_value = "decoded_message"
183-
mock_orjson_dumps = Mock(return_value=mock_decoded_bytes)
184-
185-
monkeypatch.setattr("builtins.open", _mock_open)
186-
monkeypatch.setattr("json.dump", mock_json_dump)
187-
monkeypatch.setattr("builtins.print", mock_print)
188-
monkeypatch.setattr(
189-
"airbyte_cdk.models.airbyte_protocol_serializers.AirbyteMessageSerializer.dump",
190-
mock_serializer_dump,
191-
)
192-
monkeypatch.setattr("airbyte_cdk.sources.declarative.spec.spec.orjson.dumps", mock_orjson_dumps)
193-
194-
return {
195-
"message_repository": mock_message_repository,
196-
"source": mock_source,
197-
"open": _mock_open,
198-
"json_dump": mock_json_dump,
199-
"print": mock_print,
200-
"serializer_dump": mock_serializer_dump,
201-
"orjson_dumps": mock_orjson_dumps,
202-
"decoded_bytes": mock_decoded_bytes,
203-
}
204-
205-
206-
def test_given_unmigrated_config_when_migrating_then_config_is_migrated(migration_mocks) -> None:
207-
input_config = {"planet": "CRSC"}
208-
209-
spec = component_spec(
210-
connection_specification={},
211-
parameters={},
212-
config_migrations=[
213-
ConfigMigration(
214-
description="Test migration",
215-
transformations=[
216-
ConfigRemapField(
217-
map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config
218-
)
219-
],
220-
)
221-
],
222-
)
223-
spec.message_repository = migration_mocks["message_repository"]
224-
225-
spec.migrate_config(["spec"], migration_mocks["source"], input_config)
226-
227-
migration_mocks["message_repository"].emit_message.assert_called_once()
228-
migration_mocks["open"].assert_called_once_with("/fake/config/path", "w")
229-
migration_mocks["json_dump"].assert_called_once()
230-
migration_mocks["print"].assert_called()
231-
migration_mocks["serializer_dump"].assert_called()
232-
migration_mocks["orjson_dumps"].assert_called()
233-
migration_mocks["decoded_bytes"].decode.assert_called()
234-
235-
236-
def test_given_already_migrated_config_no_control_message_is_emitted(migration_mocks) -> None:
237-
input_config = {"planet": "Coruscant"}
238-
239-
spec = component_spec(
240-
connection_specification={},
241-
parameters={},
242-
config_migrations=[
243-
ConfigMigration(
244-
description="Test migration",
245-
transformations=[
246-
ConfigRemapField(
247-
map={"CRSC": "Coruscant"}, field_path=["planet"], config=input_config
248-
)
249-
],
250-
)
251-
],
252-
)
253-
spec.message_repository = migration_mocks["message_repository"]
254-
255-
spec.migrate_config(["spec"], migration_mocks["source"], input_config)
256-
257-
migration_mocks["message_repository"].emit_message.assert_not_called()
258-
migration_mocks["open"].assert_not_called()
259-
migration_mocks["json_dump"].assert_not_called()
260-
migration_mocks["print"].assert_not_called()
261-
migration_mocks["serializer_dump"].assert_not_called()
262-
migration_mocks["orjson_dumps"].assert_not_called()
263-
migration_mocks["decoded_bytes"].decode.assert_not_called()
264-
265-
266162
def test_given_list_of_transformations_when_transform_config_then_config_is_transformed() -> None:
267163
input_config = {"planet_code": "CRSC"}
268164
expected_config = {

0 commit comments

Comments
 (0)