-
Notifications
You must be signed in to change notification settings - Fork 30
feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor #111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
43 commits
Select commit
Hold shift + click to select a range
d326a26
Add concurrent PerPartitionCursor
tolik0 37efbae
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 a3304b9
Use request options provider for ConcurrentPerPartitionCursor
tolik0 4ddbb84
Delete unused DeclarativePartitionFactory
tolik0 41b029d
Fixed record filter
tolik0 eb8eec8
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 dfcf17f
Add unit test
tolik0 b84e68a
Fix record filter unit tests
tolik0 2038075
Update poetry lock
tolik0 c77b9a2
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 c59ed5a
Update poetry lock again
tolik0 a01c0b5
Fix mypy error
tolik0 357a925
Add global cursor with fallback
tolik0 79ffb77
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 a36726b
Auto-fix lint and format issues
5ee05f1
Fix parent state update in case of error
tolik0 24268e2
Fix error in tests
tolik0 660da93
Fix unit tests
tolik0 23d3059
Add request_option_provider to _request_headers of SimpleRetriever
tolik0 f3a00ff
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 d6bec35
Maxi297/fix simple retriever request headers (#217)
maxi297 11b86a9
Merge branch 'tolik0/fix-simple-retriever-request-headers' into tolik…
tolik0 871f1fe
Fix format
tolik0 6d2343a
Merge branch 'tolik0/fix-simple-retriever-request-headers' into tolik…
tolik0 ed687f5
Fix merge conflict
tolik0 cfef872
Add lookback window handling
tolik0 4260415
Fix state handling in concurrent cursor
tolik0 089137f
Fix unit test
tolik0 3489c7a
Fix mypy errors
tolik0 301bd31
Add error test, fix mypy errors
tolik0 9574f8c
Fix stream slice mypy errors
tolik0 5ab4ee3
Fix lookback window
tolik0 36c4992
Refactor unit tests
tolik0 b6707ef
Refactor to add helper to get retriever
tolik0 cf5107f
Refactor to add helper to get retriever
tolik0 df0993e
Add class variable for state keys
tolik0 daa6873
Add exception if stream_slices was executed two times
tolik0 c827d82
Fix issues with error handling, refactor tests
tolik0 d4d52b9
Add comments for state format
tolik0 471ff7e
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 0c7c4de
Add emitting state after closing every partition
tolik0 6277e10
Add reqeust validation to unit tests
tolik0 19fe9c4
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
270 changes: 270 additions & 0 deletions
270
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,270 @@ | ||
| import copy | ||
|
|
||
| # | ||
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
| import logging | ||
| from collections import OrderedDict | ||
| from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional | ||
|
|
||
| from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager | ||
| from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor | ||
| from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter | ||
| from airbyte_cdk.sources.message import MessageRepository | ||
| from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import ( | ||
| PerPartitionKeySerializer, | ||
| ) | ||
| from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField | ||
| from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition | ||
| from airbyte_cdk.sources.types import Record, StreamSlice, StreamState | ||
|
|
||
| logger = logging.getLogger("airbyte") | ||
|
|
||
|
|
||
| class ConcurrentCursorFactory: | ||
| def __init__(self, create_function: Callable[..., Cursor]): | ||
tolik0 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self._create_function = create_function | ||
|
|
||
| def create(self, stream_state: Mapping[str, Any]) -> Cursor: | ||
| return self._create_function(stream_state=stream_state) | ||
|
|
||
|
|
||
| class ConcurrentPerPartitionCursor(Cursor): | ||
| """ | ||
| Manages state per partition when a stream has many partitions, to prevent data loss or duplication. | ||
|
|
||
| **Partition Limitation and Limit Reached Logic** | ||
|
|
||
| - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). | ||
| - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. | ||
| - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. | ||
|
|
||
| The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. | ||
|
|
||
| - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. | ||
| - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. | ||
|
|
||
| This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. | ||
| """ | ||
|
|
||
| DEFAULT_MAX_PARTITIONS_NUMBER = 10000 | ||
| _NO_STATE: Mapping[str, Any] = {} | ||
| _NO_CURSOR_STATE: Mapping[str, Any] = {} | ||
| _KEY = 0 | ||
| _VALUE = 1 | ||
| _state_to_migrate_from: Mapping[str, Any] = {} | ||
|
|
||
| def __init__( | ||
| self, | ||
| cursor_factory: ConcurrentCursorFactory, | ||
| partition_router: PartitionRouter, | ||
| stream_name: str, | ||
| stream_namespace: Optional[str], | ||
| stream_state: Any, | ||
| message_repository: MessageRepository, | ||
| connector_state_manager: ConnectorStateManager, | ||
| cursor_field: CursorField, | ||
| ) -> None: | ||
| self._stream_name = stream_name | ||
| self._stream_namespace = stream_namespace | ||
| self._message_repository = message_repository | ||
| self._connector_state_manager = connector_state_manager | ||
| self._cursor_field = cursor_field | ||
|
|
||
| self._cursor_factory = cursor_factory | ||
| self._partition_router = partition_router | ||
|
|
||
| # The dict is ordered to ensure that once the maximum number of partitions is reached, | ||
| # the oldest partitions can be efficiently removed, maintaining the most recent partitions. | ||
| self._cursor_per_partition: OrderedDict[str, Cursor] = OrderedDict() | ||
| self._over_limit = 0 | ||
| self._partition_serializer = PerPartitionKeySerializer() | ||
|
|
||
| self._set_initial_state(stream_state) | ||
|
|
||
| @property | ||
| def cursor_field(self) -> CursorField: | ||
tolik0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return self._cursor_field | ||
|
|
||
| @property | ||
| def state(self) -> MutableMapping[str, Any]: | ||
| states = [] | ||
| for partition_tuple, cursor in self._cursor_per_partition.items(): | ||
| cursor_state = cursor._connector_state_converter.convert_to_state_message( | ||
maxi297 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cursor._cursor_field, cursor.state | ||
| ) | ||
| if cursor_state: | ||
| states.append( | ||
| { | ||
| "partition": self._to_dict(partition_tuple), | ||
| "cursor": copy.deepcopy(cursor_state), | ||
| } | ||
| ) | ||
| state: dict[str, Any] = {"states": states} | ||
| return state | ||
|
|
||
tolik0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def close_partition(self, partition: Partition) -> None: | ||
| self._cursor_per_partition[self._to_partition_key(partition._stream_slice.partition)].close_partition_without_emit(partition=partition) | ||
|
|
||
| def ensure_at_least_one_state_emitted(self) -> None: | ||
| """ | ||
| The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be | ||
| called. | ||
| """ | ||
| self._emit_state_message() | ||
|
|
||
| def _emit_state_message(self) -> None: | ||
| self._connector_state_manager.update_state_for_stream( | ||
| self._stream_name, | ||
| self._stream_namespace, | ||
| self.state, | ||
| ) | ||
| state_message = self._connector_state_manager.create_state_message( | ||
| self._stream_name, self._stream_namespace | ||
| ) | ||
| self._message_repository.emit_message(state_message) | ||
|
|
||
|
|
||
| def stream_slices(self) -> Iterable[StreamSlice]: | ||
| slices = self._partition_router.stream_slices() | ||
| for partition in slices: | ||
| yield from self.generate_slices_from_partition(partition) | ||
|
|
||
| def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: | ||
maxi297 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Ensure the maximum number of partitions is not exceeded | ||
| self._ensure_partition_limit() | ||
|
|
||
| cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) | ||
| if not cursor: | ||
| partition_state = ( | ||
| self._state_to_migrate_from | ||
| if self._state_to_migrate_from | ||
| else self._NO_CURSOR_STATE | ||
| ) | ||
| cursor = self._create_cursor(partition_state) | ||
| self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor | ||
|
|
||
| for cursor_slice in cursor.stream_slices(): | ||
| yield StreamSlice( | ||
| partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields | ||
| ) | ||
|
|
||
| def _ensure_partition_limit(self) -> None: | ||
| """ | ||
| Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. | ||
| """ | ||
| while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: | ||
| self._over_limit += 1 | ||
| oldest_partition = self._cursor_per_partition.popitem(last=False)[ | ||
| 0 | ||
| ] # Remove the oldest partition | ||
| logger.warning( | ||
| f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." | ||
| ) | ||
|
|
||
tolik0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def limit_reached(self) -> bool: | ||
| return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER | ||
|
|
||
tolik0 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| def _set_initial_state(self, stream_state: StreamState) -> None: | ||
| """ | ||
| Set the initial state for the cursors. | ||
|
|
||
| This method initializes the state for each partition cursor using the provided stream state. | ||
| If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. | ||
|
|
||
| Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router | ||
| does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. | ||
|
|
||
| Args: | ||
| stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: | ||
maxi297 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| "states": [ | ||
| { | ||
| "partition": { | ||
| "partition_key": "value" | ||
| }, | ||
| "cursor": { | ||
| "last_updated": "2023-05-27T00:00:00Z" | ||
| } | ||
| } | ||
| ], | ||
| "parent_state": { | ||
| "parent_stream_name": { | ||
| "last_updated": "2023-05-27T00:00:00Z" | ||
| } | ||
| } | ||
| } | ||
| """ | ||
| if not stream_state: | ||
| return | ||
|
|
||
| if "states" not in stream_state: | ||
| # We assume that `stream_state` is in a global format that can be applied to all partitions. | ||
| # Example: {"global_state_format_key": "global_state_format_value"} | ||
| self._state_to_migrate_from = stream_state | ||
|
|
||
| else: | ||
| for state in stream_state["states"]: | ||
| self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( | ||
| self._create_cursor(state["cursor"]) | ||
| ) | ||
|
|
||
| # set default state for missing partitions if it is per partition with fallback to global | ||
| if "state" in stream_state: | ||
| self._state_to_migrate_from = stream_state["state"] | ||
|
|
||
| # Set parent state for partition routers based on parent streams | ||
| self._partition_router.set_initial_state(stream_state) | ||
|
|
||
| def observe(self, record: Record) -> None: | ||
| self._cursor_per_partition[self._to_partition_key(record.associated_slice.partition)].observe(record) | ||
|
|
||
| def _to_partition_key(self, partition: Mapping[str, Any]) -> str: | ||
| return self._partition_serializer.to_partition_key(partition) | ||
|
|
||
| def _to_dict(self, partition_key: str) -> Mapping[str, Any]: | ||
| return self._partition_serializer.to_partition(partition_key) | ||
|
|
||
| def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor: | ||
| cursor = self._cursor_factory.create(stream_state=cursor_state) | ||
| return cursor | ||
|
|
||
| def should_be_synced(self, record: Record) -> bool: | ||
| return self._get_cursor(record).should_be_synced(record) | ||
tolik0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
tolik0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: | ||
| if not first.associated_slice or not second.associated_slice: | ||
| raise ValueError( | ||
| f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}" | ||
| ) | ||
| if first.associated_slice.partition != second.associated_slice.partition: | ||
| raise ValueError( | ||
| f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}" | ||
| ) | ||
|
|
||
| return self._get_cursor(first).is_greater_than_or_equal( | ||
| self._convert_record_to_cursor_record(first), | ||
| self._convert_record_to_cursor_record(second), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _convert_record_to_cursor_record(record: Record) -> Record: | ||
| return Record( | ||
| record.data, | ||
| StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice) | ||
| if record.associated_slice | ||
| else None, | ||
| ) | ||
|
|
||
| def _get_cursor(self, record: Record) -> Cursor: | ||
| if not record.associated_slice: | ||
| raise ValueError( | ||
| "Invalid state as stream slices that are emitted should refer to an existing cursor" | ||
| ) | ||
| partition_key = self._to_partition_key(record.associated_slice.partition) | ||
| if partition_key not in self._cursor_per_partition: | ||
| raise ValueError( | ||
| "Invalid state as stream slices that are emitted should refer to an existing cursor" | ||
| ) | ||
| cursor = self._cursor_per_partition[partition_key] | ||
| return cursor | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.