From 2bd8dcf9c81122508029460a176ecf83e1a57345 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 5 Feb 2025 19:44:56 +0200 Subject: [PATCH 1/9] call state migration in ConcurrentDeclarativeSource --- .../sources/declarative/concurrent_declarative_source.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index d4ecc0084..5c00edd50 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -224,6 +224,9 @@ def _group_streams( stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) + for state_migration in declarative_stream.state_migrations: + if state_migration.should_migrate(stream_state): + stream_state = state_migration.migrate(stream_state) retriever = self._get_retriever(declarative_stream, stream_state) @@ -331,6 +334,10 @@ def _group_streams( stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) + for state_migration in declarative_stream.state_migrations: + if state_migration.should_migrate(stream_state): + stream_state = state_migration.migrate(stream_state) + partition_router = declarative_stream.retriever.stream_slicer._partition_router perpartition_cursor = ( From 327f5fca96f50860025a727f285d4fc816517635 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 5 Feb 2025 19:45:43 +0200 Subject: [PATCH 2/9] added unit tests --- .../declarative/custom_state_migration.py | 47 ++++ .../test_concurrent_declarative_source.py | 229 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 unit_tests/sources/declarative/custom_state_migration.py diff --git a/unit_tests/sources/declarative/custom_state_migration.py b/unit_tests/sources/declarative/custom_state_migration.py new file mode 100644 index 000000000..86ca4a5c4 --- /dev/null +++ b/unit_tests/sources/declarative/custom_state_migration.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping + +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration +from airbyte_cdk.sources.types import Config + + +class CustomStateMigration(StateMigration): + declarative_stream: DeclarativeStream + config: Config + + def __init__(self, declarative_stream: DeclarativeStream, config: Config): + self._config = config + self.declarative_stream = declarative_stream + self._cursor = declarative_stream.incremental_sync + self._parameters = declarative_stream.parameters + self._cursor_field = InterpolatedString.create( + self._cursor.cursor_field, parameters=self._parameters + ).eval(self._config) + + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: + return True + + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + if not self.should_migrate(stream_state): + return stream_state + updated_at = stream_state[self._cursor.cursor_field] + + migrated_stream_state = { + "states": [ + { + "partition": {"type": "type_1"}, + "cursor": {self._cursor.cursor_field: updated_at}, + }, + { + "partition": {"type": "type_2"}, + "cursor": {self._cursor.cursor_field: updated_at}, + }, + ] + } + + return migrated_stream_state diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1877e11bb..8553df964 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1231,6 +1231,235 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): assert len(party_members_skills_records) == 9 +@freezegun.freeze_time(_NOW) +def test_read_with_state_when_state_migration_was_provided(): + manifest = { + "version": "5.0.0", + "definitions": { + "selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "requester": { + "type": "HttpRequester", + "url_base": "https://persona.metaverse.com", + "http_method": "GET", + "authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['api_key'] }}", + "password": "{{ config['secret_key'] }}", + }, + "error_handler": { + "type": "DefaultErrorHandler", + "response_filters": [ + { + "http_codes": [403], + "action": "FAIL", + "failure_type": "config_error", + "error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.", + }, + { + "http_codes": [404], + "action": "IGNORE", + "error_message": "No data available for the time range requested.", + }, + ], + }, + }, + "retriever": { + "type": "SimpleRetriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "paginator": {"type": "NoPagination"}, + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_cursor": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_granularity": "P1D", + "step": "P15D", + "cursor_field": "updated_at", + "lookback_window": "P5D", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, + "base_incremental_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, + }, + "party_members_stream": { + "$ref": "#/definitions/base_incremental_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_stream/retriever", + "requester": { + "$ref": "#/definitions/requester", + "request_parameters": {"filter": "{{stream_partition['type']}}"}, + }, + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": [ + { + "type": "ListPartitionRouter", + "values": ["type_1", "type_2"], + "cursor_field": "type", + } + ], + }, + "$parameters": { + "name": "party_members", + "primary_key": "id", + "path": "/party_members", + }, + "state_migrations": [ + { + "type": "CustomStateMigration", + "class_name": "unit_tests.sources.declarative.custom_state_migration.CustomStateMigration", + } + ], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, + }, + "streams": [ + "#/definitions/party_members_stream", + ], + "check": {"stream_names": ["party_members", "locations"]}, + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + } + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="party_members", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-08-21"), + ), + ), + ] + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="party_members", + json_schema={}, + supported_sync_modes=[SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + ] + ) + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state + ) + party_members_slices_and_responses = [ + ( + {"start": "2024-08-16", "end": "2024-08-30", "filter": "type_1"}, + HttpResponse( + json.dumps( + [ + { + "id": "nijima", + "first_name": "makoto", + "last_name": "nijima", + "updated_at": "2024-08-10", + "type": 1, + } + ] + ) + ), + ), + ( + {"start": "2024-08-16", "end": "2024-08-30", "filter": "type_2"}, + HttpResponse( + json.dumps( + [ + { + "id": "nijima", + "first_name": "makoto", + "last_name": "nijima", + "updated_at": "2024-08-10", + "type": 2, + } + ] + ) + ), + ), + ( + {"start": "2024-08-31", "end": "2024-09-10", "filter": "type_1"}, + HttpResponse( + json.dumps( + [ + { + "id": "yoshizawa", + "first_name": "sumire", + "last_name": "yoshizawa", + "updated_at": "2024-09-10", + "type": 1, + } + ] + ) + ), + ), + ( + {"start": "2024-08-31", "end": "2024-09-10", "filter": "type_2"}, + HttpResponse( + json.dumps( + [ + { + "id": "yoshizawa", + "first_name": "sumire", + "last_name": "yoshizawa", + "updated_at": "2024-09-10", + "type": 2, + } + ] + ) + ), + ), + ] + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, party_members_slices_and_responses) + messages = list( + source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) + ) + final_state = get_states_for_stream(stream_name="party_members", messages=messages) + assert state not in final_state + + @freezegun.freeze_time(_NOW) @patch( "airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter.__init__", From 08198acb9d924a15cad14826cfc2d688e5b568b8 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 5 Feb 2025 20:07:41 +0200 Subject: [PATCH 3/9] types fix --- .../sources/declarative/concurrent_declarative_source.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 5c00edd50..8d66e48a4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -226,7 +226,8 @@ def _group_streams( ) for state_migration in declarative_stream.state_migrations: if state_migration.should_migrate(stream_state): - stream_state = state_migration.migrate(stream_state) + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) retriever = self._get_retriever(declarative_stream, stream_state) @@ -336,7 +337,8 @@ def _group_streams( ) for state_migration in declarative_stream.state_migrations: if state_migration.should_migrate(stream_state): - stream_state = state_migration.migrate(stream_state) + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) partition_router = declarative_stream.retriever.stream_slicer._partition_router From e61916c285be9df794b665e7a85023a8aca7d59b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 5 Feb 2025 21:07:17 +0200 Subject: [PATCH 4/9] updated unit test, refactor state migrations --- .../concurrent_declarative_source.py | 23 ++-- .../test_concurrent_declarative_source.py | 102 +++--------------- 2 files changed, 27 insertions(+), 98 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 8d66e48a4..b1745b0d4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -3,7 +3,7 @@ # import logging -from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple +from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, MutableMapping from airbyte_cdk.models import ( AirbyteCatalog, @@ -224,10 +224,7 @@ def _group_streams( stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) - for state_migration in declarative_stream.state_migrations: - if state_migration.should_migrate(stream_state): - # The state variable is expected to be mutable but the migrate method returns an immutable mapping. - stream_state = dict(state_migration.migrate(stream_state)) + stream_state = self._migrate_state(declarative_stream, stream_state) retriever = self._get_retriever(declarative_stream, stream_state) @@ -335,10 +332,7 @@ def _group_streams( stream_state = self._connector_state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) - for state_migration in declarative_stream.state_migrations: - if state_migration.should_migrate(stream_state): - # The state variable is expected to be mutable but the migrate method returns an immutable mapping. - stream_state = dict(state_migration.migrate(stream_state)) + stream_state = self._migrate_state(declarative_stream, stream_state) partition_router = declarative_stream.retriever.stream_slicer._partition_router @@ -530,3 +524,14 @@ def _remove_concurrent_streams_from_catalog( if stream.stream.name not in concurrent_stream_names ] ) + + @staticmethod + def _migrate_state( + declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] + ) -> MutableMapping[str, Any]: + for state_migration in declarative_stream.state_migrations: + if state_migration.should_migrate(stream_state): + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) + + return stream_state diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 8553df964..82a564b59 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1231,8 +1231,7 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): assert len(party_members_skills_records) == 9 -@freezegun.freeze_time(_NOW) -def test_read_with_state_when_state_migration_was_provided(): +def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifest(): manifest = { "version": "5.0.0", "definitions": { @@ -1360,104 +1359,29 @@ def test_read_with_state_when_state_migration_was_provided(): "max_concurrency": 25, }, } + state_blob = AirbyteStateBlob(updated_at="2024-08-21") state = [ AirbyteStateMessage( type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor(name="party_members", namespace=None), - stream_state=AirbyteStateBlob(updated_at="2024-08-21"), + stream_state=state_blob, ), ), ] - catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="party_members", - json_schema={}, - supported_sync_modes=[SyncMode.incremental], - ), - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - ) - ] - ) source = ConcurrentDeclarativeSource( source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state ) - party_members_slices_and_responses = [ - ( - {"start": "2024-08-16", "end": "2024-08-30", "filter": "type_1"}, - HttpResponse( - json.dumps( - [ - { - "id": "nijima", - "first_name": "makoto", - "last_name": "nijima", - "updated_at": "2024-08-10", - "type": 1, - } - ] - ) - ), - ), - ( - {"start": "2024-08-16", "end": "2024-08-30", "filter": "type_2"}, - HttpResponse( - json.dumps( - [ - { - "id": "nijima", - "first_name": "makoto", - "last_name": "nijima", - "updated_at": "2024-08-10", - "type": 2, - } - ] - ) - ), - ), - ( - {"start": "2024-08-31", "end": "2024-09-10", "filter": "type_1"}, - HttpResponse( - json.dumps( - [ - { - "id": "yoshizawa", - "first_name": "sumire", - "last_name": "yoshizawa", - "updated_at": "2024-09-10", - "type": 1, - } - ] - ) - ), - ), - ( - {"start": "2024-08-31", "end": "2024-09-10", "filter": "type_2"}, - HttpResponse( - json.dumps( - [ - { - "id": "yoshizawa", - "first_name": "sumire", - "last_name": "yoshizawa", - "updated_at": "2024-09-10", - "type": 2, - } - ] - ) - ), - ), - ] - with HttpMocker() as http_mocker: - _mock_party_members_requests(http_mocker, party_members_slices_and_responses) - messages = list( - source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) - ) - final_state = get_states_for_stream(stream_name="party_members", messages=messages) - assert state not in final_state + concurrent_streams, synchronous_streams = source._group_streams(_CONFIG) + assert concurrent_streams[0].cursor.state != state_blob.__dict__ + assert concurrent_streams[0].cursor.state == { + "lookback_window": 0, + "states": [ + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, + ], + "use_global_cursor": False, + } @freezegun.freeze_time(_NOW) From 445246863b731c276f55fd4a13198571ec35f61d Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 5 Feb 2025 19:11:24 +0000 Subject: [PATCH 5/9] Auto-fix lint and format issues --- .../sources/declarative/concurrent_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index b1745b0d4..96bd67c32 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -3,7 +3,7 @@ # import logging -from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, MutableMapping +from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple from airbyte_cdk.models import ( AirbyteCatalog, From e1b162618d346f2b57c622b6301dd657f539f539 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 6 Feb 2025 12:37:39 +0200 Subject: [PATCH 6/9] run state migration in create_concurrent_cursor_from_datetime_based_cursor --- .../parsers/model_to_component_factory.py | 11 ++++ .../test_model_to_component_factory.py | 58 +++++++++++++++++++ .../test_concurrent_declarative_source.py | 16 +++-- 3 files changed, 76 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a664b8530..9713aedaf 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -943,6 +943,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( config: Config, message_repository: Optional[MessageRepository] = None, runtime_lookback_window: Optional[datetime.timedelta] = None, + stream_state_migrations: Optional[List[Any]] = None, **kwargs: Any, ) -> ConcurrentCursor: # Per-partition incremental streams can dynamically create child cursors which will pass their current @@ -953,6 +954,11 @@ def create_concurrent_cursor_from_datetime_based_cursor( if "stream_state" not in kwargs else kwargs["stream_state"] ) + if stream_state_migrations: + for state_migration in stream_state_migrations: + if state_migration.should_migrate(stream_state): + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: @@ -1188,6 +1194,7 @@ def create_concurrent_cursor_from_perpartition_cursor( config: Config, stream_state: MutableMapping[str, Any], partition_router: PartitionRouter, + stream_state_migrations: Optional[List[Any]] = None, **kwargs: Any, ) -> ConcurrentPerPartitionCursor: component_type = component_definition.get("type") @@ -1236,6 +1243,7 @@ def create_concurrent_cursor_from_perpartition_cursor( stream_namespace=stream_namespace, config=config, message_repository=NoopMessageRepository(), + stream_state_migrations=stream_state_migrations, ) ) @@ -1697,6 +1705,7 @@ def _merge_stream_slicers( config=config or {}, stream_state={}, partition_router=stream_slicer, + stream_state_migrations=model.state_migrations, ) return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing model_type=DatetimeBasedCursorModel, @@ -1704,6 +1713,7 @@ def _merge_stream_slicers( stream_name=model.name or "", stream_namespace=None, config=config or {}, + stream_state_migrations=model.state_migrations, ) incremental_sync_model = model.incremental_sync @@ -1746,6 +1756,7 @@ def _merge_stream_slicers( stream_name=model.name or "", stream_namespace=None, config=config or {}, + stream_state_migrations=model.state_migrations, ) return ( self._create_component_from_model(model=model.incremental_sync, config=config) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 43564a5c8..4bf4b1079 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3281,6 +3281,64 @@ def test_create_concurrent_cursor_from_datetime_based_cursor( assert getattr(concurrent_cursor, assertion_field) == expected_value +def test_create_concurrent_cursor_from_datetime_based_cursor_runs_state_migrations(): + class DummyStateMigration: + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: + return True + + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + updated_at = stream_state["updated_at"] + return { + "states": [ + { + "partition": {"type": "type_1"}, + "cursor": {"updated_at": updated_at}, + }, + { + "partition": {"type": "type_2"}, + "cursor": {"updated_at": updated_at}, + }, + ] + } + + stream_name = "test" + config = { + "start_time": "2024-08-01T00:00:00.000000Z", + "end_time": "2024-09-01T00:00:00.000000Z", + } + stream_state = {"updated_at": "2025-01-01T00:00:00.000000Z"} + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + connector_state_manager = ConnectorStateManager() + cursor_component_definition = { + "type": "DatetimeBasedCursor", + "cursor_field": "updated_at", + "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", + "start_datetime": "{{ config['start_time'] }}", + "end_datetime": "{{ config['end_time'] }}", + "partition_field_start": "custom_start", + "partition_field_end": "custom_end", + "step": "P10D", + "cursor_granularity": "PT0.000001S", + "lookback_window": "P3D", + } + concurrent_cursor = ( + connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( + state_manager=connector_state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=cursor_component_definition, + stream_name=stream_name, + stream_namespace=None, + config=config, + stream_state=stream_state, + stream_state_migrations=[DummyStateMigration()], + ) + ) + assert concurrent_cursor.state["states"] == [ + {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_1"}}, + {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_2"}}, + ] + + def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): """ Validates a special case for when the start_time.datetime_format and end_time.datetime_format are defined, the date to diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 82a564b59..71874248d 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1373,15 +1373,13 @@ def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifes source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state ) concurrent_streams, synchronous_streams = source._group_streams(_CONFIG) - assert concurrent_streams[0].cursor.state != state_blob.__dict__ - assert concurrent_streams[0].cursor.state == { - "lookback_window": 0, - "states": [ - {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, - {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, - ], - "use_global_cursor": False, - } + assert ( + concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__ + ), "State was not migrated." + assert concurrent_streams[0].cursor.state.get("states") == [ + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, + ], "State was migrated, but actual state don't match expected" @freezegun.freeze_time(_NOW) From b82d7a5c2535f2303e0028d104cdf73637874416 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 6 Feb 2025 19:45:11 +0200 Subject: [PATCH 7/9] run state migration in create_concurrent_cursor_from_perpartition_cursor --- .../parsers/model_to_component_factory.py | 6 ++ .../test_model_to_component_factory.py | 62 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9713aedaf..6df676b20 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1247,6 +1247,12 @@ def create_concurrent_cursor_from_perpartition_cursor( ) ) + if stream_state_migrations: + for state_migration in stream_state_migrations: + if state_migration.should_migrate(stream_state): + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) + # Return the concurrent cursor and state converter return ConcurrentPerPartitionCursor( cursor_factory=cursor_factory, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 4bf4b1079..32a73f364 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3339,6 +3339,68 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: ] +def test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations(): + class DummyStateMigration: + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: + return True + + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + stream_state["lookback_window"] = 10 * 2 + return stream_state + + state = { + "states": [ + { + "partition": {"type": "typ_1"}, + "cursor": {"updated_at": "2024-08-01T00:00:00.000000Z"}, + } + ], + "state": {"updated_at": "2024-08-01T00:00:00.000000Z"}, + "lookback_window": 10, + "parent_state": {"parent_test": {"last_updated": "2024-08-01T00:00:00.000000Z"}}, + } + config = { + "start_time": "2024-08-01T00:00:00.000000Z", + "end_time": "2024-09-01T00:00:00.000000Z", + } + list_partition_router = ListPartitionRouter( + cursor_field="id", + values=["type_1", "type_2", "type_3"], + config=config, + parameters={}, + ) + connector_state_manager = ConnectorStateManager() + stream_name = "test" + cursor_component_definition = { + "type": "DatetimeBasedCursor", + "cursor_field": "updated_at", + "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", + "start_datetime": "{{ config['start_time'] }}", + "end_datetime": "{{ config['end_time'] }}", + "partition_field_start": "custom_start", + "partition_field_end": "custom_end", + "step": "P10D", + "cursor_granularity": "PT0.000001S", + "lookback_window": "P3D", + } + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + cursor = connector_builder_factory.create_concurrent_cursor_from_perpartition_cursor( + state_manager=connector_state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=cursor_component_definition, + stream_name=stream_name, + stream_namespace=None, + config=config, + stream_state=state, + partition_router=list_partition_router, + stream_state_migrations=[DummyStateMigration()], + ) + assert cursor.state["lookback_window"] != 10, "State migration wasn't called" + assert ( + cursor.state["lookback_window"] == 20 + ), "State migration was called, but actual state don't match expected" + + def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): """ Validates a special case for when the start_time.datetime_format and end_time.datetime_format are defined, the date to From 1aae4b786b6f57301d97c8a7c7c1753d53e8ae18 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 7 Feb 2025 16:35:24 +0200 Subject: [PATCH 8/9] refactor code --- .../parsers/model_to_component_factory.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6df676b20..345605933 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -934,6 +934,17 @@ def create_concurrency_level( parameters={}, ) + @staticmethod + def apply_stream_state_migrations( + stream_state_migrations: List[Any], stream_state: MutableMapping[str, Any] + ) -> MutableMapping[str, Any]: + if stream_state_migrations: + for state_migration in stream_state_migrations: + if state_migration.should_migrate(stream_state): + # The state variable is expected to be mutable but the migrate method returns an immutable mapping. + stream_state = dict(state_migration.migrate(stream_state)) + return stream_state + def create_concurrent_cursor_from_datetime_based_cursor( self, model_type: Type[BaseModel], @@ -954,11 +965,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( if "stream_state" not in kwargs else kwargs["stream_state"] ) - if stream_state_migrations: - for state_migration in stream_state_migrations: - if state_migration.should_migrate(stream_state): - # The state variable is expected to be mutable but the migrate method returns an immutable mapping. - stream_state = dict(state_migration.migrate(stream_state)) + stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: @@ -1246,12 +1253,7 @@ def create_concurrent_cursor_from_perpartition_cursor( stream_state_migrations=stream_state_migrations, ) ) - - if stream_state_migrations: - for state_migration in stream_state_migrations: - if state_migration.should_migrate(stream_state): - # The state variable is expected to be mutable but the migrate method returns an immutable mapping. - stream_state = dict(state_migration.migrate(stream_state)) + stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) # Return the concurrent cursor and state converter return ConcurrentPerPartitionCursor( @@ -1711,7 +1713,6 @@ def _merge_stream_slicers( config=config or {}, stream_state={}, partition_router=stream_slicer, - stream_state_migrations=model.state_migrations, ) return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing model_type=DatetimeBasedCursorModel, @@ -1719,7 +1720,6 @@ def _merge_stream_slicers( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, ) incremental_sync_model = model.incremental_sync From b5c98c4db41559d68a5cf20ecaaf3ff0f3354d93 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 7 Feb 2025 16:45:58 +0200 Subject: [PATCH 9/9] mypy fix --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 345605933..c6d69623d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -936,7 +936,7 @@ def create_concurrency_level( @staticmethod def apply_stream_state_migrations( - stream_state_migrations: List[Any], stream_state: MutableMapping[str, Any] + stream_state_migrations: List[Any] | None, stream_state: MutableMapping[str, Any] ) -> MutableMapping[str, Any]: if stream_state_migrations: for state_migration in stream_state_migrations: