-
Notifications
You must be signed in to change notification settings - Fork 39
Implement scanner API for Consensus Commit #2711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement scanner API for Consensus Commit #2711
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a scanner API for Consensus Commit transactions by updating multiple transaction and snapshot related classes and enhancing the corresponding integration and unit tests. Key changes include:
- Implementation of scanner methods (getScanner, one, all, close) in transaction manager classes.
- Refactoring of snapshot and CRUD handler logic to incorporate scanner state and error recovery.
- Extensive updates to both integration and core tests to validate the new scanner API behavior.
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitIntegrationTestBase.java | Removed disabled tests to focus on active scanner API tests. |
core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java | Replaced unsupported scanner method with a concrete implementation that wraps scanner behavior and ensures transaction rollback on errors. |
core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java | Added scanner management (tracking, closing) and new methods getScanner()/closeScanners() for proper lifecycle management. |
core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java | Extended snapshot handling to include scannerSet and adjusted validation logic for not fully scanned scanners. |
core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit*.java | Updated scanner implementations to use new recovery and lazy evaluation strategies. |
core/src/main/java/com/scalar/db/common/error/CoreError.java | Added new error codes for scanner closure requirements. |
Comments suppressed due to low confidence (1)
core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java:193
- [nitpick] Consider adding a brief inline explanation describing the error recovery flow—specifically how rollbackTransaction is invoked when scanner acquisition fails—to improve the readability and maintainability of the scanner API implementation.
public Scanner getScanner(Scan scan) throws CrudException {
return scanners.stream().allMatch(ConsensusCommitScanner::isClosed); | ||
} | ||
|
||
public void closeScanners() throws CrudException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider clearing the internal 'scanners' list at the end of closeScanners() to release references to closed scanners and to avoid potential side effects on subsequent scanner operations.
Copilot uses AI. Check for mistakes.
@@ -604,6 +624,11 @@ private void validateScanResults( | |||
return; | |||
} | |||
|
|||
if (notFullyScannedScanner) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding an inline comment explaining why further validation is skipped for not fully scanned scanners, to clarify the intentional trade-off in snapshot consistency checks.
Copilot uses AI. Check for mistakes.
6e3bde3
to
bdb959c
Compare
@@ -213,6 +249,10 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException { | |||
|
|||
@Override | |||
public void commit() throws CommitException, UnknownTransactionStatusException { | |||
if (!crud.areAllScannersClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if all the scanners are closed before committing a transaction.
Optional<LinkedHashMap<Snapshot.Key, TransactionResult>> resultsInSnapshot = | ||
snapshot.getResults(scan); | ||
if (resultsInSnapshot.isPresent()) { | ||
scanner = | ||
new ConsensusCommitSnapshotScanner(scan, originalProjections, resultsInSnapshot.get()); | ||
} else { | ||
scanner = new ConsensusCommitStorageScanner(scan, originalProjections); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the scan result of the same scan is in the scanSet in the snapshot, we use it. If not, we scan the storage directly.
} | ||
|
||
@NotThreadSafe | ||
private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOperableScanner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scanner implementation for scanning the storage.
} | ||
|
||
@NotThreadSafe | ||
private class ConsensusCommitSnapshotScanner extends AbstractTransactionCrudOperableScanner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scanner implementation for scanning the result in the snapshot.
if (fullyScanned) { | ||
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results | ||
// into the scan set | ||
snapshot.putIntoScanSet(scan, results); | ||
} else { | ||
// If the scanner is not fully scanned, put the results into the scanner set | ||
snapshot.putIntoScannerSet(scan, results); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a tricky part. The behavior depends on whether the storage scanner has been fully scanned.
If the storage scanner has been fully scanned, we can treat it as a normal scan and put the result into the scan set, just like a regular scan.
If the storage scanner has not been fully scanned, we put the results into the scanner set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to differently handle the not-fully-scanned scan
is that there can be remaining records? If so, how about reading the remaining records in this method and adding them to results
so that we can treat the scan as a fully scanned one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to differently handle the not-fully-scanned scan is that there can be remaining records?
Yes, because of that, the not-fully-scanned scan should need special handling in the serializable validation, similar to scan with limit.
If so, how about reading the remaining records in this method and adding them to results so that we can treat the scan as a fully scanned one?
In that case, if the remaining records are large, we would consume a lot of memory. One of the advantages of the scanner API is that it only consumes memory as records are scanned. So, we should avoid doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That sounds reasonable.
@@ -64,6 +64,9 @@ public class Snapshot { | |||
private final Map<Key, Put> writeSet; | |||
private final Map<Key, Delete> deleteSet; | |||
|
|||
// The scanner set used to store information about scanners that are not fully scanned | |||
private final List<ScannerInfo> scannerSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added scannerSet that's used to store information about scanners that are not fully scanned.
// Scanner set is re-validated to check if there is no anti-dependency | ||
for (ScannerInfo scannerInfo : scannerSet) { | ||
tasks.add(() -> validateScanResults(storage, scannerInfo.scan, scannerInfo.results, true)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added serializable validation for the scanner set.
if (notFullyScannedScanner) { | ||
// If the scanner is not fully scanned, no further checks are needed | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the scanner has not been fully scanned, no further check is needed. This is similar to the case when a limit is specified:
scalardb/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java
Lines 622 to 625 in 1717cfb
if (scan.getLimit() != 0 && results.size() == scan.getLimit()) { | |
// We’ve already checked up to the limit, so no further checks are needed | |
return; | |
} |
return scanner.one(); | ||
} catch (UncommittedRecordException e) { | ||
lazyRecovery(e); | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] Is there any reason not to close scanner
here while ConsensusCommitManager.getScanner()
closes that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't close the scanner here, but we close here:
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-acc93f1366b03168017001646c7217f7dd1f3c7cbe8a41b6a9a8484dc5ef0905R406
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is probably ConsensusCommitManager
commits after one scan, whereas ConsensusCommit
commits after doing several operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oops, sorry I thought here was in catch (CrudException e) { ... }
block).
I just thought we can immediately close the failed scanner as well as ConsensusCommitManager.getScanner()
, for consistency and a bit faster resource release. It's just out of curiosity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry for the lack of explanation.
This anonymous Scanner
class wraps the scanner instance returned by CrudHandler
here:
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-ec7fb790c921b21b0690f23d557c721585b0c83ef32b729e7dada7009d6142d0R104
The CrudHandler.getScanner()
method returns an instance of ConsensusCommitStorageScanner
if the scan result is not present in the snapshot.
The ConsensusCommitStorageScanner.one()
method immediately closes the underlying scanner when a failure occurs:
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-acc93f1366b03168017001646c7217f7dd1f3c7cbe8a41b6a9a8484dc5ef0905R424-R452
So, the behavior is consistent with ConsensusCommitManager.getScanner()
.
By the way, the anonymous Scanner
class simply executes lazy recovery when an UncommittedRecordException
is caught.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! Sounds good 💯
if (fullyScanned) { | ||
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results | ||
// into the scan set | ||
snapshot.putIntoScanSet(scan, results); | ||
} else { | ||
// If the scanner is not fully scanned, put the results into the scanner set | ||
snapshot.putIntoScannerSet(scan, results); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to differently handle the not-fully-scanned scan
is that there can be remaining records? If so, how about reading the remaining records in this method and adding them to results
so that we can treat the scan as a fully scanned one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looking good!
I will take another look.
|
||
@Override | ||
public List<Result> all() throws CrudException { | ||
List<Result> results = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we return the results in the class variable by doing ImmutabeList.copyOf(results)
or something instead of creating another list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You’re referring to the following class variable results
, right?
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-acc93f1366b03168017001646c7217f7dd1f3c7cbe8a41b6a9a8484dc5ef0905R413
If so, the class variable results
contains all the fetched results, but the all()
method is supposed to return only the remaining results.
For example, suppose the scan result is [1, 2, 3, 4, 5]
. In the following case, the all()
method should return [3, 4, 5]
:
Result result1 = scanner.one(); // returns 1
Result result2 = scanner.one(); // returns 2
List<Result> results = scanner.all(); // returns [3, 4, 5]
That’s why we can’t simply return something like ImmutableList.copyOf(this.results)
in the all()
method. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, OK, I misunderstood the behavior.
I don't think we can change it for backward compatibility, but the method name might be confusing since all()
sounds like returning all results, not remaining. (it's probably my mistake though)
return scanner.one(); | ||
} catch (UncommittedRecordException e) { | ||
lazyRecovery(e); | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is probably ConsensusCommitManager
commits after one scan, whereas ConsensusCommit
commits after doing several operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 👍
return scanner.one(); | ||
} catch (UncommittedRecordException e) { | ||
lazyRecovery(e); | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oops, sorry I thought here was in catch (CrudException e) { ... }
block).
I just thought we can immediately close the failed scanner as well as ConsensusCommitManager.getScanner()
, for consistency and a bit faster resource release. It's just out of curiosity.
if (fullyScanned) { | ||
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results | ||
// into the scan set | ||
snapshot.putIntoScanSet(scan, results); | ||
} else { | ||
// If the scanner is not fully scanned, put the results into the scanner set | ||
snapshot.putIntoScannerSet(scan, results); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That sounds reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a minor comment, PTAL.
Besides this, LGTM, thank you.
core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java
Outdated
Show resolved
Hide resolved
…rudHandler.java Co-authored-by: Vincent Guilpain <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
|
||
@Override | ||
public List<Result> all() throws CrudException { | ||
List<Result> results = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, OK, I misunderstood the behavior.
I don't think we can change it for backward compatibility, but the method name might be confusing since all()
sounds like returning all results, not remaining. (it's probably my mistake though)
7b91326
into
add-scanner-api-to-transaction-abstraction
Description
This PR adds a scanner API implementation for Consensus Commit transactions.
Note that we are working on this feature in the
add-scanner-api-to-transaction-abstraction
feature branch.Related issues and/or PRs
Changes made
Checklist
Additional notes (optional)
N/A
Release notes
N/A