diff --git a/airbyte_cdk/sources/embedded/__init__.py b/airbyte_cdk/sources/embedded/__init__.py deleted file mode 100644 index 46b737675..000000000 --- a/airbyte_cdk/sources/embedded/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte_cdk/sources/embedded/base_integration.py b/airbyte_cdk/sources/embedded/base_integration.py deleted file mode 100644 index 77917b0a1..000000000 --- a/airbyte_cdk/sources/embedded/base_integration.py +++ /dev/null @@ -1,61 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from abc import ABC, abstractmethod -from typing import Generic, Iterable, Optional, TypeVar - -from airbyte_cdk.connector import TConfig -from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type -from airbyte_cdk.sources.embedded.catalog import ( - create_configured_catalog, - get_stream, - get_stream_names, -) -from airbyte_cdk.sources.embedded.runner import SourceRunner -from airbyte_cdk.sources.embedded.tools import get_defined_id -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit - -TOutput = TypeVar("TOutput") - - -class BaseEmbeddedIntegration(ABC, Generic[TConfig, TOutput]): - def __init__(self, runner: SourceRunner[TConfig], config: TConfig): - check_config_against_spec_or_exit(config, runner.spec()) - - self.source = runner - self.config = config - - self.last_state: Optional[AirbyteStateMessage] = None - - @abstractmethod - def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Optional[TOutput]: - """ - Turn an Airbyte record into the appropriate output type for the integration. - """ - pass - - def _load_data( - self, stream_name: str, state: Optional[AirbyteStateMessage] = None - ) -> Iterable[TOutput]: - catalog = self.source.discover(self.config) - stream = get_stream(catalog, stream_name) - if not stream: - raise ValueError( - f"Stream {stream_name} not found, the following streams are available: {', '.join(get_stream_names(catalog))}" - ) - if SyncMode.incremental not in stream.supported_sync_modes: - configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.full_refresh) - else: - configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.incremental) - - for message in self.source.read(self.config, configured_catalog, state): - if message.type == Type.RECORD: - output = self._handle_record( - message.record, - get_defined_id(stream, message.record.data), # type: ignore[union-attr, arg-type] - ) - if output: - yield output - elif message.type is Type.STATE and message.state: - self.last_state = message.state diff --git a/airbyte_cdk/sources/embedded/catalog.py b/airbyte_cdk/sources/embedded/catalog.py deleted file mode 100644 index 62c7a623d..000000000 --- a/airbyte_cdk/sources/embedded/catalog.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import List, Optional - -from airbyte_cdk.models import ( - AirbyteCatalog, - AirbyteStream, - ConfiguredAirbyteCatalog, - ConfiguredAirbyteStream, - DestinationSyncMode, - SyncMode, -) -from airbyte_cdk.sources.embedded.tools import get_first - - -def get_stream(catalog: AirbyteCatalog, stream_name: str) -> Optional[AirbyteStream]: - return get_first(catalog.streams, lambda s: s.name == stream_name) - - -def get_stream_names(catalog: AirbyteCatalog) -> List[str]: - return [stream.name for stream in catalog.streams] - - -def to_configured_stream( - stream: AirbyteStream, - sync_mode: SyncMode = SyncMode.full_refresh, - destination_sync_mode: DestinationSyncMode = DestinationSyncMode.append, - cursor_field: Optional[List[str]] = None, - primary_key: Optional[List[List[str]]] = None, -) -> ConfiguredAirbyteStream: - return ConfiguredAirbyteStream( - stream=stream, - sync_mode=sync_mode, - destination_sync_mode=destination_sync_mode, - cursor_field=cursor_field, - primary_key=primary_key, - ) - - -def to_configured_catalog( - configured_streams: List[ConfiguredAirbyteStream], -) -> ConfiguredAirbyteCatalog: - return ConfiguredAirbyteCatalog(streams=configured_streams) - - -def create_configured_catalog( - stream: AirbyteStream, sync_mode: SyncMode = SyncMode.full_refresh -) -> ConfiguredAirbyteCatalog: - configured_streams = [ - to_configured_stream( - stream, sync_mode=sync_mode, primary_key=stream.source_defined_primary_key - ) - ] - - return to_configured_catalog(configured_streams) diff --git a/airbyte_cdk/sources/embedded/runner.py b/airbyte_cdk/sources/embedded/runner.py deleted file mode 100644 index 43217f156..000000000 --- a/airbyte_cdk/sources/embedded/runner.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - - -import logging -from abc import ABC, abstractmethod -from typing import Generic, Iterable, Optional - -from airbyte_cdk.connector import TConfig -from airbyte_cdk.models import ( - AirbyteCatalog, - AirbyteMessage, - AirbyteStateMessage, - ConfiguredAirbyteCatalog, - ConnectorSpecification, -) -from airbyte_cdk.sources.source import Source - - -class SourceRunner(ABC, Generic[TConfig]): - @abstractmethod - def spec(self) -> ConnectorSpecification: - pass - - @abstractmethod - def discover(self, config: TConfig) -> AirbyteCatalog: - pass - - @abstractmethod - def read( - self, - config: TConfig, - catalog: ConfiguredAirbyteCatalog, - state: Optional[AirbyteStateMessage], - ) -> Iterable[AirbyteMessage]: - pass - - -class CDKRunner(SourceRunner[TConfig]): - def __init__(self, source: Source, name: str): - self._source = source - self._logger = logging.getLogger(name) - - def spec(self) -> ConnectorSpecification: - return self._source.spec(self._logger) - - def discover(self, config: TConfig) -> AirbyteCatalog: - return self._source.discover(self._logger, config) - - def read( - self, - config: TConfig, - catalog: ConfiguredAirbyteCatalog, - state: Optional[AirbyteStateMessage], - ) -> Iterable[AirbyteMessage]: - return self._source.read(self._logger, config, catalog, state=[state] if state else []) diff --git a/airbyte_cdk/sources/embedded/tools.py b/airbyte_cdk/sources/embedded/tools.py deleted file mode 100644 index 1ddb29b3a..000000000 --- a/airbyte_cdk/sources/embedded/tools.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Callable, Dict, Iterable, Optional - -import dpath - -from airbyte_cdk.models import AirbyteStream - - -def get_first( - iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda m: True -) -> Optional[Any]: - return next(filter(predicate, iterable), None) - - -def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]: - if not stream.source_defined_primary_key: - return None - primary_key = [] - for key in stream.source_defined_primary_key: - try: - primary_key.append(str(dpath.get(data, key))) - except KeyError: - primary_key.append("__not_found__") - return "_".join(primary_key) diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py index 75a61609b..9ee73976b 100644 --- a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -7,17 +7,41 @@ import pytest -from airbyte_cdk.models import Type +from airbyte_cdk.models import ( + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Type, +) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) -from airbyte_cdk.sources.embedded.catalog import ( - to_configured_catalog, - to_configured_stream, -) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + _CONFIG = { "start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 357dcceef..09d069bff 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -8,7 +8,12 @@ import pytest -from airbyte_cdk.models import Type +from airbyte_cdk.models import ( + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Type, +) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) @@ -17,13 +22,32 @@ ComponentMappingDefinition, HttpComponentsResolver, ) -from airbyte_cdk.sources.embedded.catalog import ( - to_configured_catalog, - to_configured_stream, -) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} _MANIFEST = { diff --git a/unit_tests/sources/embedded/test_embedded_integration.py b/unit_tests/sources/embedded/test_embedded_integration.py deleted file mode 100644 index f8e11cff1..000000000 --- a/unit_tests/sources/embedded/test_embedded_integration.py +++ /dev/null @@ -1,176 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import unittest -from typing import Any, Mapping, Optional -from unittest.mock import MagicMock - -from airbyte_cdk.models import ( - AirbyteCatalog, - AirbyteLogMessage, - AirbyteMessage, - AirbyteRecordMessage, - AirbyteStateMessage, - AirbyteStream, - ConfiguredAirbyteCatalog, - ConfiguredAirbyteStream, - ConnectorSpecification, - DestinationSyncMode, - Level, - SyncMode, - Type, -) -from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration -from airbyte_cdk.utils import AirbyteTracedException - - -class TestIntegration(BaseEmbeddedIntegration): - def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Mapping[str, Any]: - return {"data": record.data, "id": id} - - -class EmbeddedIntegrationTestCase(unittest.TestCase): - def setUp(self): - self.source_class = MagicMock() - self.source = MagicMock() - self.source_class.return_value = self.source - self.source.spec.return_value = ConnectorSpecification( - connectionSpecification={ - "properties": { - "test": { - "type": "string", - } - } - } - ) - self.config = {"test": "abc"} - self.integration = TestIntegration(self.source, self.config) - self.stream1 = AirbyteStream( - name="test", - source_defined_primary_key=[["test"]], - json_schema={}, - supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], - ) - self.stream2 = AirbyteStream( - name="test2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh] - ) - self.source.discover.return_value = AirbyteCatalog(streams=[self.stream2, self.stream1]) - - def test_integration(self): - self.source.read.return_value = [ - AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")), - AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1), - ), - AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream="test", data={"test": 2}, emitted_at=2), - ), - AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream="test", data={"test": 3}, emitted_at=3), - ), - ] - result = list(self.integration._load_data("test", None)) - self.assertEqual( - result, - [ - {"data": {"test": 1}, "id": "1"}, - {"data": {"test": 2}, "id": "2"}, - {"data": {"test": 3}, "id": "3"}, - ], - ) - self.source.discover.assert_called_once_with(self.config) - self.source.read.assert_called_once_with( - self.config, - ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=self.stream1, - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - primary_key=[["test"]], - ) - ] - ), - None, - ) - - def test_failed_check(self): - self.config = {"test": 123} - with self.assertRaises(AirbyteTracedException) as error: - TestIntegration(self.source, self.config) - assert str(error.exception) == "123 is not of type 'string'" - - def test_state(self): - state = AirbyteStateMessage(data={}) - self.source.read.return_value = [ - AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")), - AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1), - ), - AirbyteMessage(type=Type.STATE, state=state), - ] - result = list(self.integration._load_data("test", None)) - self.assertEqual( - result, - [ - {"data": {"test": 1}, "id": "1"}, - ], - ) - self.integration.last_state = state - - def test_incremental(self): - state = AirbyteStateMessage(data={}) - list(self.integration._load_data("test", state)) - self.source.read.assert_called_once_with( - self.config, - ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=self.stream1, - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - primary_key=[["test"]], - ) - ] - ), - state, - ) - - def test_incremental_without_state(self): - list(self.integration._load_data("test")) - self.source.read.assert_called_once_with( - self.config, - ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=self.stream1, - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - primary_key=[["test"]], - ) - ] - ), - None, - ) - - def test_incremental_unsupported(self): - state = AirbyteStateMessage(data={}) - list(self.integration._load_data("test2", state)) - self.source.read.assert_called_once_with( - self.config, - ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=self.stream2, - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ) - ] - ), - state, - )