Skip to content

Commit 6c0d36d

Browse files
maxi297octavia-squidington-iii
andauthored
fix: state migration (#690)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent cb193ab commit 6c0d36d

File tree

1 file changed

+20
-25
lines changed

1 file changed

+20
-25
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,23 +2112,22 @@ def _build_incremental_cursor(
21122112
stream_slicer: Optional[PartitionRouter],
21132113
config: Config,
21142114
) -> Optional[StreamSlicer]:
2115+
state_transformations = (
2116+
[
2117+
self._create_component_from_model(state_migration, config, declarative_stream=model)
2118+
for state_migration in model.state_migrations
2119+
]
2120+
if model.state_migrations
2121+
else []
2122+
)
2123+
21152124
if model.incremental_sync and stream_slicer:
21162125
if model.retriever.type == "AsyncRetriever":
21172126
stream_name = model.name or ""
21182127
stream_namespace = None
21192128
stream_state = self._connector_state_manager.get_stream_state(
21202129
stream_name, stream_namespace
21212130
)
2122-
state_transformations = (
2123-
[
2124-
self._create_component_from_model(
2125-
state_migration, config, declarative_stream=model
2126-
)
2127-
for state_migration in model.state_migrations
2128-
]
2129-
if model.state_migrations
2130-
else []
2131-
)
21322131

21332132
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
21342133
state_manager=self._connector_state_manager,
@@ -2172,7 +2171,7 @@ def _build_incremental_cursor(
21722171
stream_name=model.name or "",
21732172
stream_namespace=None,
21742173
config=config or {},
2175-
stream_state_migrations=model.state_migrations,
2174+
stream_state_migrations=state_transformations,
21762175
)
21772176
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
21782177
return None
@@ -2187,19 +2186,15 @@ def _build_concurrent_cursor(
21872186
stream_name=model.name or "", namespace=None
21882187
)
21892188

2190-
if model.incremental_sync and stream_slicer:
2191-
# FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
2192-
# ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
2193-
if model.state_migrations:
2194-
state_transformations = [
2195-
self._create_component_from_model(
2196-
state_migration, config, declarative_stream=model
2197-
)
2198-
for state_migration in model.state_migrations
2199-
]
2200-
else:
2201-
state_transformations = []
2189+
if model.state_migrations:
2190+
state_transformations = [
2191+
self._create_component_from_model(state_migration, config, declarative_stream=model)
2192+
for state_migration in model.state_migrations
2193+
]
2194+
else:
2195+
state_transformations = []
22022196

2197+
if model.incremental_sync and stream_slicer:
22032198
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22042199
state_manager=self._connector_state_manager,
22052200
model_type=DatetimeBasedCursorModel,
@@ -2220,7 +2215,7 @@ def _build_concurrent_cursor(
22202215
stream_name=model.name or "",
22212216
stream_namespace=None,
22222217
config=config or {},
2223-
stream_state_migrations=model.state_migrations,
2218+
stream_state_migrations=state_transformations,
22242219
)
22252220
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
22262221
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
@@ -2229,7 +2224,7 @@ def _build_concurrent_cursor(
22292224
stream_name=model.name or "",
22302225
stream_namespace=None,
22312226
config=config or {},
2232-
stream_state_migrations=model.state_migrations,
2227+
stream_state_migrations=state_transformations,
22332228
attempt_to_create_cursor_if_not_provided=True,
22342229
)
22352230
else:

0 commit comments

Comments
 (0)