- 
                Notifications
    You must be signed in to change notification settings 
- Fork 30
feat(AsyncRetriever): Allow for streams using AsyncRetriever and DatetimeBasedCursor to perform checkpointing #226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…form checkpointing
| 📝 WalkthroughWalkthroughThe pull request introduces enhancements to the Airbyte CDK's declarative source components, focusing on improving async retriever and partition router functionality. The changes primarily involve updating cursor management, adding support for  Changes
 Sequence DiagramsequenceDiagram
    participant AsyncRetriever
    participant StreamSlicer
    participant Cursor
    
    AsyncRetriever->>StreamSlicer: Get stream slice
    StreamSlicer-->>AsyncRetriever: Return stream slice
    AsyncRetriever->>Cursor: Observe cursor state
    AsyncRetriever->>AsyncRetriever: Process records
    AsyncRetriever->>Cursor: Update cursor state
Possibly related PRs
 Suggested labels
 Suggested reviewers
 Hey there! 👋 I noticed these changes look quite interesting. Would you like me to elaborate on any specific aspect of the modifications? The cursor management enhancements seem particularly intriguing. Wdyt? 🤔 Finishing Touches
 Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 
 Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (2)
40-43: Consider adding type hints for better code clarityThe type hint for
self.stream_slicerin the condition would make the code more maintainable. What do you think about this change?- if isinstance(self.stream_slicer, DatetimeBasedCursor): + if isinstance(self.stream_slicer, DatetimeBasedCursor): # type: StreamSlicer
45-47: Add docstring to the cursor propertyWould you consider adding a docstring to explain the purpose and return type of this property? Something like:
@property def cursor(self) -> Optional[DeclarativeCursor]: + """ + Returns the cursor associated with this partition router if it exists. + + Returns: + Optional[DeclarativeCursor]: The cursor used for checkpointing, or None if not available. + """ return self._cursorairbyte_cdk/sources/declarative/retrievers/async_retriever.py (2)
88-113: Consider extracting cursor-related logic to improve readabilityThe
read_recordsmethod has grown complex with the addition of cursor management. Would you consider extracting the cursor-related logic into a separate method for better maintainability? Something like:+ def _handle_cursor_updates( + self, + stream_data: Optional[StreamData], + stream_slice: StreamSlice, + most_recent_record: Optional[Record], + ) -> Optional[Record]: + """Handle cursor updates for a single record.""" + if self.cursor and stream_data: + self.cursor.observe(stream_slice, stream_data) + return self._get_most_recent_record(most_recent_record, stream_data, stream_slice) + return most_recent_record def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) stream_state: StreamState = self.state partition: AsyncPartition = self._validate_and_get_stream_slice_partition(stream_slice) records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(partition) most_recent_record_from_slice = None for stream_data in self.record_selector.filter_and_transform( all_data=records, stream_state=stream_state, records_schema=records_schema, stream_slice=_slice, ): - if self.cursor and stream_data: - self.cursor.observe(_slice, stream_data) - - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, stream_data, _slice - ) + most_recent_record_from_slice = self._handle_cursor_updates( + stream_data, _slice, most_recent_record_from_slice + ) yield stream_data
115-131: Add early return for optimizationIn
_get_most_recent_record, we could optimize the logic with an early return. What do you think about this change?def _get_most_recent_record( self, current_most_recent: Optional[Record], current_record: Optional[Record], stream_slice: StreamSlice, ) -> Optional[Record]: + if not self.cursor or not current_record: + return None + if not current_most_recent: + return current_record + return ( + current_most_recent + if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) + else current_record + ) - if self.cursor and current_record: - if not current_most_recent: - return current_record - else: - return ( - current_most_recent - if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) - else current_record - ) - else: - return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
- airbyte_cdk/sources/declarative/declarative_stream.py(3 hunks)
- airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py(2 hunks)
- airbyte_cdk/sources/declarative/retrievers/async_retriever.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/declarative_stream.py (1)
193-196: LGTM! Clean implementation of AsyncRetriever support.The extension of
get_cursorto supportAsyncRetrieveris well-implemented and aligns with the PR objectives.
| closing in favor of #228 | 
What
Add the ability for streams that define an AsyncRetriever and a DatetimeBasedCursor to perform periodic checkpointing when the date time window of a stream slice is synced successfully.
This is needed to unblock
source-amazon-seller-partnerwhich supports incremental async jobs.How
It's worth noting that this PR goes a bit in the opposite direction where we want to deprecate the existing
DatetimeBasedCursorbecause now its more closely integrated into theAsyncRetrieverstream slicing mechanism. However, due to the short time frame to deliver Amazon Seller Partner, trying to inject theConcurrentCursorinto the low-codeAsyncJobPartitionRouterfelt like the more difficult path.The tradeoff in the short term here is that our Async + Incremental streams will run synchronously (worth noting that this was already the existing behavior based on how we construct the
concurrent_declarative_source.py) So we incur a little bit of tech debt in exchange for simplicity to implementSummary by CodeRabbit
New Features
Improvements
Technical Enhancements