Skip to content

Commit e61916c

Browse files
updated unit test, refactor state migrations
1 parent 08198ac commit e61916c

File tree

2 files changed

+27
-98
lines changed

2 files changed

+27
-98
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
import logging
6-
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple
6+
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, MutableMapping
77

88
from airbyte_cdk.models import (
99
AirbyteCatalog,
@@ -224,10 +224,7 @@ def _group_streams(
224224
stream_state = self._connector_state_manager.get_stream_state(
225225
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
226226
)
227-
for state_migration in declarative_stream.state_migrations:
228-
if state_migration.should_migrate(stream_state):
229-
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
230-
stream_state = dict(state_migration.migrate(stream_state))
227+
stream_state = self._migrate_state(declarative_stream, stream_state)
231228

232229
retriever = self._get_retriever(declarative_stream, stream_state)
233230

@@ -335,10 +332,7 @@ def _group_streams(
335332
stream_state = self._connector_state_manager.get_stream_state(
336333
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
337334
)
338-
for state_migration in declarative_stream.state_migrations:
339-
if state_migration.should_migrate(stream_state):
340-
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
341-
stream_state = dict(state_migration.migrate(stream_state))
335+
stream_state = self._migrate_state(declarative_stream, stream_state)
342336

343337
partition_router = declarative_stream.retriever.stream_slicer._partition_router
344338

@@ -530,3 +524,14 @@ def _remove_concurrent_streams_from_catalog(
530524
if stream.stream.name not in concurrent_stream_names
531525
]
532526
)
527+
528+
@staticmethod
529+
def _migrate_state(
530+
declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
531+
) -> MutableMapping[str, Any]:
532+
for state_migration in declarative_stream.state_migrations:
533+
if state_migration.should_migrate(stream_state):
534+
# The state variable is expected to be mutable but the migrate method returns an immutable mapping.
535+
stream_state = dict(state_migration.migrate(stream_state))
536+
537+
return stream_state

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 13 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,8 +1231,7 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state():
12311231
assert len(party_members_skills_records) == 9
12321232

12331233

1234-
@freezegun.freeze_time(_NOW)
1235-
def test_read_with_state_when_state_migration_was_provided():
1234+
def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifest():
12361235
manifest = {
12371236
"version": "5.0.0",
12381237
"definitions": {
@@ -1360,104 +1359,29 @@ def test_read_with_state_when_state_migration_was_provided():
13601359
"max_concurrency": 25,
13611360
},
13621361
}
1362+
state_blob = AirbyteStateBlob(updated_at="2024-08-21")
13631363
state = [
13641364
AirbyteStateMessage(
13651365
type=AirbyteStateType.STREAM,
13661366
stream=AirbyteStreamState(
13671367
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
1368-
stream_state=AirbyteStateBlob(updated_at="2024-08-21"),
1368+
stream_state=state_blob,
13691369
),
13701370
),
13711371
]
1372-
catalog = ConfiguredAirbyteCatalog(
1373-
streams=[
1374-
ConfiguredAirbyteStream(
1375-
stream=AirbyteStream(
1376-
name="party_members",
1377-
json_schema={},
1378-
supported_sync_modes=[SyncMode.incremental],
1379-
),
1380-
sync_mode=SyncMode.incremental,
1381-
destination_sync_mode=DestinationSyncMode.append,
1382-
)
1383-
]
1384-
)
13851372
source = ConcurrentDeclarativeSource(
13861373
source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state
13871374
)
1388-
party_members_slices_and_responses = [
1389-
(
1390-
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_1"},
1391-
HttpResponse(
1392-
json.dumps(
1393-
[
1394-
{
1395-
"id": "nijima",
1396-
"first_name": "makoto",
1397-
"last_name": "nijima",
1398-
"updated_at": "2024-08-10",
1399-
"type": 1,
1400-
}
1401-
]
1402-
)
1403-
),
1404-
),
1405-
(
1406-
{"start": "2024-08-16", "end": "2024-08-30", "filter": "type_2"},
1407-
HttpResponse(
1408-
json.dumps(
1409-
[
1410-
{
1411-
"id": "nijima",
1412-
"first_name": "makoto",
1413-
"last_name": "nijima",
1414-
"updated_at": "2024-08-10",
1415-
"type": 2,
1416-
}
1417-
]
1418-
)
1419-
),
1420-
),
1421-
(
1422-
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_1"},
1423-
HttpResponse(
1424-
json.dumps(
1425-
[
1426-
{
1427-
"id": "yoshizawa",
1428-
"first_name": "sumire",
1429-
"last_name": "yoshizawa",
1430-
"updated_at": "2024-09-10",
1431-
"type": 1,
1432-
}
1433-
]
1434-
)
1435-
),
1436-
),
1437-
(
1438-
{"start": "2024-08-31", "end": "2024-09-10", "filter": "type_2"},
1439-
HttpResponse(
1440-
json.dumps(
1441-
[
1442-
{
1443-
"id": "yoshizawa",
1444-
"first_name": "sumire",
1445-
"last_name": "yoshizawa",
1446-
"updated_at": "2024-09-10",
1447-
"type": 2,
1448-
}
1449-
]
1450-
)
1451-
),
1452-
),
1453-
]
1454-
with HttpMocker() as http_mocker:
1455-
_mock_party_members_requests(http_mocker, party_members_slices_and_responses)
1456-
messages = list(
1457-
source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
1458-
)
1459-
final_state = get_states_for_stream(stream_name="party_members", messages=messages)
1460-
assert state not in final_state
1375+
concurrent_streams, synchronous_streams = source._group_streams(_CONFIG)
1376+
assert concurrent_streams[0].cursor.state != state_blob.__dict__
1377+
assert concurrent_streams[0].cursor.state == {
1378+
"lookback_window": 0,
1379+
"states": [
1380+
{"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}},
1381+
{"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}},
1382+
],
1383+
"use_global_cursor": False,
1384+
}
14611385

14621386

14631387
@freezegun.freeze_time(_NOW)

0 commit comments

Comments
 (0)