diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 1e465bd550..7c5a5fd1d4 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -911,6 +911,18 @@ public enum CoreError implements ScalarDbError { Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""), DATA_LOADER_CONFIG_FILE_PATH_BLANK( Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""), + CONSENSUS_COMMIT_SCANNER_NOT_CLOSED( + Category.USER_ERROR, + "0205", + "Some scanners were not closed. All scanners must be closed before committing the transaction.", + "", + ""), + TWO_PHASE_CONSENSUS_COMMIT_SCANNER_NOT_CLOSED( + Category.USER_ERROR, + "0206", + "Some scanners were not closed. All scanners must be closed before preparing the transaction.", + "", + ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index cd4c2fb4a6..c2f6f12797 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -24,8 +24,10 @@ import com.scalar.db.exception.transaction.UnsatisfiedConditionException; import com.scalar.db.util.ScalarDbUtils; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.slf4j.Logger; @@ -98,7 +100,41 @@ public List scan(Scan scan) throws CrudException { @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + scan = copyAndSetTargetToIfNot(scan); + Scanner scanner = crud.getScanner(scan); + + return new Scanner() { + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (UncommittedRecordException e) { + lazyRecovery(e); + throw e; + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (UncommittedRecordException e) { + lazyRecovery(e); + throw e; + } + } + + @Override + public void close() throws CrudException { + scanner.close(); + } + + @Nonnull + @Override + public Iterator iterator() { + return scanner.iterator(); + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -213,6 +249,10 @@ public void mutate(List mutations) throws CrudException { @Override public void commit() throws CommitException, UnknownTransactionStatusException { + if (!crud.areAllScannersClosed()) { + throw new IllegalStateException(CoreError.CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage()); + } + // Execute implicit pre-read try { crud.readIfImplicitPreReadEnabled(); @@ -234,6 +274,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException { @Override public void rollback() { + try { + crud.closeScanners(); + } catch (CrudException e) { + logger.warn("Failed to close the scanner", e); + } + if (groupCommitter != null) { groupCommitter.remove(crud.getSnapshot().getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 46a13035cb..9eff205532 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -16,10 +16,12 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransactionManager; +import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; @@ -34,6 +36,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; @@ -231,9 +234,86 @@ public List scan(Scan scan) throws CrudException, UnknownTransactionStat @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + DistributedTransaction transaction = begin(); + + TransactionCrudOperable.Scanner scanner; + try { + scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan)); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + return new AbstractTransactionManagerCrudOperableScanner() { + + private final AtomicBoolean closed = new AtomicBoolean(); + + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public void close() throws CrudException, UnknownTransactionStatusException { + if (closed.get()) { + return; + } + closed.set(true); + + try { + scanner.close(); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + try { + transaction.commit(); + } catch (CommitConflictException e) { + rollbackTransaction(transaction); + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (UnknownTransactionStatusException e) { + throw e; + } catch (TransactionException e) { + rollbackTransaction(transaction); + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + } + }; } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(Put put) throws CrudException, UnknownTransactionStatusException { @@ -244,6 +324,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(List puts) throws CrudException, UnknownTransactionStatusException { @@ -290,6 +371,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index c66595ca81..ebe5da6a13 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -16,6 +16,8 @@ import com.scalar.db.api.Scanner; import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionCrudOperable; +import com.scalar.db.common.AbstractTransactionCrudOperableScanner; import com.scalar.db.common.error.CoreError; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; @@ -23,10 +25,13 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -42,6 +47,7 @@ public class CrudHandler { private final boolean isIncludeMetadataEnabled; private final MutationConditionsValidator mutationConditionsValidator; private final ParallelExecutor parallelExecutor; + private final List scanners = new ArrayList<>(); @SuppressFBWarnings("EI_EXPOSE_REP2") public CrudHandler( @@ -211,6 +217,37 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re snapshot.putIntoReadSet(key, Optional.of(result)); } + public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException { + List originalProjections = new ArrayList<>(originalScan.getProjections()); + Scan scan = (Scan) prepareStorageSelection(originalScan); + + ConsensusCommitScanner scanner; + + Optional> resultsInSnapshot = + snapshot.getResults(scan); + if (resultsInSnapshot.isPresent()) { + scanner = + new ConsensusCommitSnapshotScanner(scan, originalProjections, resultsInSnapshot.get()); + } else { + scanner = new ConsensusCommitStorageScanner(scan, originalProjections); + } + + scanners.add(scanner); + return scanner; + } + + public boolean areAllScannersClosed() { + return scanners.stream().allMatch(ConsensusCommitScanner::isClosed); + } + + public void closeScanners() throws CrudException { + for (ConsensusCommitScanner scanner : scanners) { + if (!scanner.isClosed()) { + scanner.close(); + } + } + } + public void put(Put put) throws CrudException { Snapshot.Key key = new Snapshot.Key(put); @@ -360,4 +397,169 @@ private TableMetadata getTableMetadata(Operation operation) throws CrudException public Snapshot getSnapshot() { return snapshot; } + + private interface ConsensusCommitScanner extends TransactionCrudOperable.Scanner { + boolean isClosed(); + } + + @NotThreadSafe + private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOperableScanner + implements ConsensusCommitScanner { + + private final Scan scan; + private final List originalProjections; + private final Scanner scanner; + + private final LinkedHashMap results = new LinkedHashMap<>(); + private final AtomicBoolean fullyScanned = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + + public ConsensusCommitStorageScanner(Scan scan, List originalProjections) + throws CrudException { + this.scan = scan; + this.originalProjections = originalProjections; + scanner = scanFromStorage(scan); + } + + @Override + public Optional one() throws CrudException { + try { + Optional r = scanner.one(); + + if (!r.isPresent()) { + fullyScanned.set(true); + return Optional.empty(); + } + + Snapshot.Key key = new Snapshot.Key(scan, r.get()); + TransactionResult result = new TransactionResult(r.get()); + processScanResult(key, scan, result); + results.put(key, result); + + TableMetadata metadata = getTableMetadata(scan); + return Optional.of( + new FilteredResult(result, originalProjections, metadata, isIncludeMetadataEnabled)); + } catch (ExecutionException e) { + closeScanner(); + throw new CrudException( + CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(), + e, + snapshot.getId()); + } catch (CrudException e) { + closeScanner(); + throw e; + } + } + + @Override + public List all() throws CrudException { + List results = new ArrayList<>(); + + while (true) { + Optional result = one(); + if (!result.isPresent()) { + break; + } + results.add(result.get()); + } + + return results; + } + + @Override + public void close() { + if (closed.get()) { + return; + } + + closeScanner(); + + if (fullyScanned.get()) { + // If the scanner is fully scanned, we can treat it as a normal scan, and put the results + // into the scan set + snapshot.putIntoScanSet(scan, results); + } else { + // If the scanner is not fully scanned, put the results into the scanner set + snapshot.putIntoScannerSet(scan, results); + } + + snapshot.verifyNoOverlap(scan, results); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + private void closeScanner() { + closed.set(true); + try { + scanner.close(); + } catch (IOException e) { + logger.warn("Failed to close the scanner", e); + } + } + } + + @NotThreadSafe + private class ConsensusCommitSnapshotScanner extends AbstractTransactionCrudOperableScanner + implements ConsensusCommitScanner { + + private final Scan scan; + private final List originalProjections; + private final Iterator> resultsIterator; + + private final LinkedHashMap results = new LinkedHashMap<>(); + private boolean closed; + + public ConsensusCommitSnapshotScanner( + Scan scan, + List originalProjections, + LinkedHashMap resultsInSnapshot) { + this.scan = scan; + this.originalProjections = originalProjections; + resultsIterator = resultsInSnapshot.entrySet().iterator(); + } + + @Override + public Optional one() throws CrudException { + if (!resultsIterator.hasNext()) { + return Optional.empty(); + } + + Map.Entry entry = resultsIterator.next(); + results.put(entry.getKey(), entry.getValue()); + + TableMetadata metadata = getTableMetadata(scan); + return Optional.of( + new FilteredResult( + entry.getValue(), originalProjections, metadata, isIncludeMetadataEnabled)); + } + + @Override + public List all() throws CrudException { + List results = new ArrayList<>(); + + while (true) { + Optional result = one(); + if (!result.isPresent()) { + break; + } + results.add(result.get()); + } + + return results; + } + + @Override + public void close() { + closed = true; + snapshot.verifyNoOverlap(scan, results); + } + + @Override + public boolean isClosed() { + return closed; + } + } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index cf535281ab..24b16e59c7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -64,6 +64,9 @@ public class Snapshot { private final Map writeSet; private final Map deleteSet; + // The scanner set used to store information about scanners that are not fully scanned + private final List scannerSet; + public Snapshot( String id, Isolation isolation, @@ -78,6 +81,7 @@ public Snapshot( scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); + scannerSet = new ArrayList<>(); } @VisibleForTesting @@ -90,7 +94,8 @@ public Snapshot( ConcurrentMap> getSet, Map> scanSet, Map writeSet, - Map deleteSet) { + Map deleteSet, + List scannerSet) { this.id = id; this.isolation = isolation; this.tableMetadataManager = tableMetadataManager; @@ -100,6 +105,7 @@ public Snapshot( this.scanSet = scanSet; this.writeSet = writeSet; this.deleteSet = deleteSet; + this.scannerSet = scannerSet; } @Nonnull @@ -173,6 +179,10 @@ public void putIntoDeleteSet(Key key, Delete delete) { deleteSet.put(key, delete); } + public void putIntoScannerSet(Scan scan, LinkedHashMap results) { + scannerSet.add(new ScannerInfo(scan, results)); + } + public List getPutsInWriteSet() { return new ArrayList<>(writeSet.values()); } @@ -485,7 +495,12 @@ void toSerializable(DistributedStorage storage) // Scan set is re-validated to check if there is no anti-dependency for (Map.Entry> entry : scanSet.entrySet()) { - tasks.add(() -> validateScanResults(storage, entry.getKey(), entry.getValue())); + tasks.add(() -> validateScanResults(storage, entry.getKey(), entry.getValue(), false)); + } + + // Scanner set is re-validated to check if there is no anti-dependency + for (ScannerInfo scannerInfo : scannerSet) { + tasks.add(() -> validateScanResults(storage, scannerInfo.scan, scannerInfo.results, true)); } // Get set is re-validated to check if there is no anti-dependency @@ -527,11 +542,16 @@ void toSerializable(DistributedStorage storage) * @param storage a distributed storage * @param scan the scan to be validated * @param results the results of the scan + * @param notFullyScannedScanner if this is a validation for a scanner that has not been fully + * scanned * @throws ExecutionException if a storage operation fails * @throws ValidationConflictException if the scan results are changed by another transaction */ private void validateScanResults( - DistributedStorage storage, Scan scan, LinkedHashMap results) + DistributedStorage storage, + Scan scan, + LinkedHashMap results, + boolean notFullyScannedScanner) throws ExecutionException, ValidationConflictException { Scanner scanner = null; try { @@ -604,6 +624,11 @@ private void validateScanResults( return; } + if (notFullyScannedScanner) { + // If the scanner is not fully scanned, no further checks are needed + return; + } + // Check if there are any remaining records in the latest scan results while (latestResult.isPresent()) { TransactionResult latestTxResult = new TransactionResult(latestResult.get()); @@ -654,7 +679,7 @@ private void validateGetWithIndexResult( originalResult.ifPresent(r -> results.put(new Snapshot.Key(scanWithIndex, r), r)); // Validate the result to check if there is no anti-dependency - validateScanResults(storage, scanWithIndex, results); + validateScanResults(storage, scanWithIndex, results, false); } private void validateGetResult( @@ -842,4 +867,32 @@ public int hashCode() { return Objects.hash(transactionId, readSetMap, writeSet, deleteSet); } } + + @VisibleForTesting + static class ScannerInfo { + public final Scan scan; + public final LinkedHashMap results; + + public ScannerInfo(Scan scan, LinkedHashMap results) { + this.scan = scan; + this.results = results; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ScannerInfo)) { + return false; + } + ScannerInfo that = (ScannerInfo) o; + return Objects.equals(scan, that.scan) && Objects.equals(results, that.results); + } + + @Override + public int hashCode() { + return Objects.hash(scan, results); + } + } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java index d8edb63291..dd72f03b4d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java @@ -27,8 +27,10 @@ import com.scalar.db.exception.transaction.ValidationException; import com.scalar.db.util.ScalarDbUtils; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +90,41 @@ public List scan(Scan scan) throws CrudException { @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + scan = copyAndSetTargetToIfNot(scan); + Scanner scanner = crud.getScanner(scan); + + return new Scanner() { + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (UncommittedRecordException e) { + lazyRecovery(e); + throw e; + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (UncommittedRecordException e) { + lazyRecovery(e); + throw e; + } + } + + @Override + public void close() throws CrudException { + scanner.close(); + } + + @Nonnull + @Override + public Iterator iterator() { + return scanner.iterator(); + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -211,6 +247,11 @@ public void mutate(List mutations) throws CrudException { @Override public void prepare() throws PreparationException { + if (!crud.areAllScannersClosed()) { + throw new IllegalStateException( + CoreError.TWO_PHASE_CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage()); + } + // Execute implicit pre-read try { crud.readIfImplicitPreReadEnabled(); @@ -261,6 +302,12 @@ public void commit() throws CommitConflictException, UnknownTransactionStatusExc @Override public void rollback() throws RollbackException { + try { + crud.closeScanners(); + } catch (CrudException e) { + logger.warn("Failed to close the scanner", e); + } + if (!needRollback) { return; } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 3c9a47aa60..1097f0b62f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -13,10 +13,12 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.TwoPhaseCommitTransaction; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; +import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.common.AbstractTwoPhaseCommitTransactionManager; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; @@ -35,6 +37,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; import org.slf4j.Logger; @@ -188,9 +191,90 @@ public List scan(Scan scan) throws CrudException, UnknownTransactionStat @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + TwoPhaseCommitTransaction transaction = begin(); + + TransactionCrudOperable.Scanner scanner; + try { + scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan)); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + return new AbstractTransactionManagerCrudOperableScanner() { + + private final AtomicBoolean closed = new AtomicBoolean(); + + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public void close() throws CrudException, UnknownTransactionStatusException { + if (closed.get()) { + return; + } + closed.set(true); + + try { + scanner.close(); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + try { + transaction.prepare(); + transaction.validate(); + transaction.commit(); + } catch (PreparationConflictException + | ValidationConflictException + | CommitConflictException e) { + rollbackTransaction(transaction); + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (UnknownTransactionStatusException e) { + throw e; + } catch (TransactionException e) { + rollbackTransaction(transaction); + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + } + }; } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(Put put) throws CrudException, UnknownTransactionStatusException { @@ -201,6 +285,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(List puts) throws CrudException, UnknownTransactionStatusException { @@ -247,6 +332,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index 4b363a1b35..0260c70309 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -22,6 +22,8 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; @@ -38,6 +40,8 @@ import com.scalar.db.transaction.consensuscommit.Coordinator.State; import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -666,6 +670,338 @@ public void scan_ShouldScan() throws TransactionException { assertThat(actual).isEqualTo(results); } + @Test + public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThat(actual.one()).hasValue(result1); + assertThat(actual.one()).hasValue(result2); + assertThat(actual.one()).hasValue(result3); + assertThat(actual.one()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.all()) + .thenReturn(Arrays.asList(result1, result2, result3)) + .thenReturn(Collections.emptyList()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + List results = actual.all(); + assertThat(results).containsExactly(result1, result2, result3); + assertThat(actual.all()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResults() + throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + + Iterator iterator = actual.iterator(); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result1); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result2); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result3); + assertThat(iterator.hasNext()).isFalse(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void + getScanner_CrudExceptionThrownByTransactionGetScanner_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + when(transaction.getScanner(scan)).thenThrow(CrudException.class); + + // Act Assert + assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerOne_CrudExceptionThrownByScannerOne_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.one()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerAll_CrudExceptionThrownByScannerAll_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.all()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CrudExceptionThrownByScannerClose_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + doThrow(CrudException.class).when(scanner).close(); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CommitConflictExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudConflictException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitConflictException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_UnknownTransactionStatusExceptionByTransactionCommit_ShouldThrowUnknownTransactionStatusException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); + + verify(spied).begin(); + verify(scanner).close(); + } + + @Test + public void + getScannerAndScannerClose_CommitExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + @Test public void put_ShouldPut() throws TransactionException { // Arrange diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java index 536d2ebfe5..f81a85b70b 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java @@ -21,6 +21,7 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.exception.storage.ExecutionException; @@ -68,6 +69,9 @@ public class ConsensusCommitTest { @BeforeEach public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); + + // Arrange + when(crud.areAllScannersClosed()).thenReturn(true); } private Get prepareGet() { @@ -164,6 +168,93 @@ public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws Crud verify(recovery).recover(scan, result); } + @Test + public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + Result result = mock(Result.class); + when(scanner.one()).thenReturn(Optional.of(result)); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = consensus.getScanner(scan); + Optional actualResult = actualScanner.one(); + + // Assert + assertThat(actualResult).hasValue(result); + verify(crud).getScanner(scan); + verify(scanner).one(); + } + + @Test + public void + getScannerAndScannerOne_UncommittedRecordExceptionThrownByScannerOne_ShouldRecoverRecord() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + TransactionResult result = mock(TransactionResult.class); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(scanner.one()).thenThrow(toThrow); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actualScanner = consensus.getScanner(scan); + assertThatThrownBy(actualScanner::one).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + + @Test + public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + when(scanner.all()).thenReturn(Arrays.asList(result1, result2)); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = consensus.getScanner(scan); + List actualResults = actualScanner.all(); + + // Assert + assertThat(actualResults).containsExactly(result1, result2); + verify(crud).getScanner(scan); + verify(scanner).all(); + } + + @Test + public void + getScannerAndScannerAll_UncommittedRecordExceptionThrownByScannerAll_ShouldRecoverRecord() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + TransactionResult result = mock(TransactionResult.class); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(scanner.all()).thenThrow(toThrow); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actualScanner = consensus.getScanner(scan); + assertThatThrownBy(actualScanner::all).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -674,21 +765,30 @@ public void commit_ProcessedCrudGiven_ShouldCommitWithSnapshot() } @Test - public void rollback_WithoutGroupCommitter_ShouldDoNothing() - throws UnknownTransactionStatusException { + public void commit_ScannerNotClosed_ShouldThrowIllegalStateException() { + // Arrange + when(crud.areAllScannersClosed()).thenReturn(false); + + // Act Assert + assertThatThrownBy(() -> consensus.commit()).isInstanceOf(IllegalStateException.class); + } + + @Test + public void rollback_ShouldDoNothing() throws CrudException, UnknownTransactionStatusException { // Arrange // Act consensus.rollback(); // Assert + verify(crud).closeScanners(); verify(commit, never()).rollbackRecords(any(Snapshot.class)); verify(commit, never()).abortState(anyString()); } @Test public void rollback_WithGroupCommitter_ShouldRemoveTxFromGroupCommitter() - throws UnknownTransactionStatusException { + throws CrudException, UnknownTransactionStatusException { // Arrange String txId = "tx-id"; Snapshot snapshot = mock(Snapshot.class); @@ -702,6 +802,7 @@ public void rollback_WithGroupCommitter_ShouldRemoveTxFromGroupCommitter() consensusWithGroupCommit.rollback(); // Assert + verify(crud).closeScanners(); verify(groupCommitter).remove(txId); verify(commit, never()).rollbackRecords(any(Snapshot.class)); verify(commit, never()).abortState(anyString()); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 933d40b5d6..689ede3bb9 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -24,6 +24,7 @@ import com.scalar.db.api.ScanAll; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.common.ResultImpl; import com.scalar.db.exception.storage.ExecutionException; @@ -33,6 +34,8 @@ import com.scalar.db.io.Key; import com.scalar.db.io.TextColumn; import com.scalar.db.util.ScalarDbUtils; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -44,8 +47,9 @@ import java.util.concurrent.ConcurrentMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -231,8 +235,8 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()).putIntoReadSet(any(), ArgumentMatchers.any()); - verify(snapshot, never()).putIntoGetSet(any(), ArgumentMatchers.any()); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoGetSet(any(), any()); } @Test @@ -345,23 +349,29 @@ public void get_ForNonExistingTable_ShouldThrowIllegalArgumentException() assertThatThrownBy(() -> handler.get(get)).isInstanceOf(IllegalArgumentException.class); } - @Test - public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanType scanType) + throws ExecutionException, CrudException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); TransactionResult expected = new TransactionResult(result); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act - List results = handler.scan(scan); + List results = scanOrGetScanner(scan, scanType); // Assert + verify(scanner).close(); verify(snapshot).putIntoReadSet(key, Optional.of(expected)); verify(snapshot).putIntoScanSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key, expected))); verify(snapshot).verifyNoOverlap(scan, ImmutableMap.of(key, expected)); @@ -370,19 +380,24 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); } - @Test - public void - scan_PreparedResultGivenFromStorage_ShouldNeverUpdateSnapshotThrowUncommittedRecordException() - throws ExecutionException { + @ParameterizedTest + @EnumSource(ScanType.class) + void + scanOrGetScanner_PreparedResultGivenFromStorage_ShouldNeverUpdateSnapshotThrowUncommittedRecordException( + ScanType scanType) throws ExecutionException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.PREPARED); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); // Act Assert - assertThatThrownBy(() -> handler.scan(scan)) + assertThatThrownBy(() -> scanOrGetScanner(scan, scanType)) .isInstanceOf(UncommittedRecordException.class) .satisfies( e -> { @@ -392,13 +407,15 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()).putIntoReadSet(any(), ArgumentMatchers.any()); - verify(snapshot, never()).putIntoScanSet(any(), ArgumentMatchers.any()); + verify(scanner).close(); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoScanSet(any(), any()); } - @Test - public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot(ScanType scanType) + throws ExecutionException, CrudException, IOException { // Arrange Scan originalScan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(originalScan); @@ -406,7 +423,11 @@ public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); TransactionResult expected = new TransactionResult(result); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); Snapshot.Key key = new Snapshot.Key(scanForStorage, result); when(snapshot.getResults(scanForStorage)) @@ -415,10 +436,11 @@ public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() when(snapshot.getResult(key)).thenReturn(Optional.of(expected)); // Act - List results1 = handler.scan(scan1); - List results2 = handler.scan(scan2); + List results1 = scanOrGetScanner(scan1, scanType); + List results2 = scanOrGetScanner(scan2, scanType); // Assert + verify(scanner).close(); verify(snapshot).putIntoReadSet(key, Optional.of(expected)); verify(snapshot) .putIntoScanSet(scanForStorage, Maps.newLinkedHashMap(ImmutableMap.of(key, expected))); @@ -430,9 +452,10 @@ public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() verify(storage).scan(scanForStorage); } - @Test - public void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot( + ScanType scanType) throws ExecutionException, CrudException, IOException { // Arrange Scan originalScan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(originalScan); @@ -442,30 +465,41 @@ public void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromS TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); // Act - List results1 = handler.scan(scan1); - List results2 = handler.scan(scan2); + List results1 = scanOrGetScanner(scan1, scanType); + List results2 = scanOrGetScanner(scan2, scanType); // Assert assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); assertThat(results1).isEqualTo(results2); + + verify(scanner).close(); verify(storage, never()).scan(originalScan); verify(storage).scan(scanForStorage); } - @Test - public void scan_GetCalledAfterScan_ShouldReturnFromStorage() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_GetCalledAfterScan_ShouldReturnFromStorage(ScanType scanType) + throws ExecutionException, CrudException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); Get get = prepareGet(); Snapshot.Key key = new Snapshot.Key(get); @@ -476,46 +510,55 @@ public void scan_GetCalledAfterScan_ShouldReturnFromStorage() when(snapshot.getResult(key)).thenReturn(transactionResult); // Act - List results = handler.scan(scan); + List results = scanOrGetScanner(scan, scanType); Optional result = handler.get(get); // Assert verify(storage).scan(scanForStorage); - verify(storage).get(getForStorage); + verify(scanner).close(); + assertThat(results.size()).isEqualTo(1); assertThat(result).isPresent(); assertThat(results.get(0)).isEqualTo(result.get()); } - @Test - public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage( + ScanType scanType) throws ExecutionException, CrudException, IOException { // Arrange Scan scan = toScanForStorageFrom(prepareScan()); result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(scan)).thenReturn(scanner); Get get = prepareGet(); when(storage.get(get)).thenReturn(Optional.of(result)); // Act - List results = handler.scan(scan); + List results = scanOrGetScanner(scan, scanType); Optional result = handler.get(get); // Assert verify(storage).scan(scan); verify(storage).get(get); + verify(scanner).close(); + assertThat(results.size()).isEqualTo(1); assertThat(result).isPresent(); assertThat(results.get(0)).isEqualTo(result.get()); } - @Test - public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentException() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentException( + ScanType scanType) throws ExecutionException, CrudException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); @@ -551,9 +594,17 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentEx new ConcurrentHashMap<>(), new HashMap<>(), new HashMap<>(), - deleteSet); + deleteSet, + new ArrayList<>()); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); - when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); + } else { + when(scanner.one()) + .thenReturn(Optional.of(result)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + } when(storage.scan(scanForStorage)).thenReturn(scanner); Delete delete = @@ -568,26 +619,35 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentEx assertThat(deleteSet.size()).isEqualTo(1); assertThat(deleteSet).containsKey(new Snapshot.Key(delete)); - assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> scanOrGetScanner(scan, scanType)) + .isInstanceOf(IllegalArgumentException.class); + + verify(scanner).close(); } - @Test - public void - scan_CrossPartitionScanAndResultFromStorageGiven_ShouldUpdateSnapshotAndVerifyNoOverlapThenReturn() - throws ExecutionException, CrudException { + @ParameterizedTest + @EnumSource(ScanType.class) + void + scanOrGetScanner_CrossPartitionScanAndResultFromStorageGiven_ShouldUpdateSnapshotAndVerifyNoOverlapThenReturn( + ScanType scanType) throws ExecutionException, CrudException, IOException { // Arrange Scan scan = prepareCrossPartitionScan(); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(any(ScanAll.class))).thenReturn(scanner); TransactionResult transactionResult = new TransactionResult(result); when(snapshot.getResult(key)).thenReturn(Optional.of(transactionResult)); // Act - List results = handler.scan(scan); + List results = scanOrGetScanner(scan, scanType); // Assert + verify(scanner).close(); verify(snapshot).putIntoReadSet(key, Optional.of(transactionResult)); verify(snapshot) .putIntoScanSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key, transactionResult))); @@ -598,18 +658,23 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentEx new FilteredResult(transactionResult, Collections.emptyList(), TABLE_METADATA, false)); } - @Test - public void - scan_CrossPartitionScanAndPreparedResultFromStorageGiven_ShouldNeverUpdateSnapshotNorVerifyNoOverlapButThrowUncommittedRecordException() - throws ExecutionException { + @ParameterizedTest + @EnumSource(ScanType.class) + void + scanOrGetScanner_CrossPartitionScanAndPreparedResultFromStorageGiven_ShouldNeverUpdateSnapshotNorVerifyNoOverlapButThrowUncommittedRecordException( + ScanType scanType) throws ExecutionException, IOException { // Arrange Scan scan = prepareCrossPartitionScan(); result = prepareResult(TransactionState.PREPARED); - when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } when(storage.scan(any(ScanAll.class))).thenReturn(scanner); // Act Assert - assertThatThrownBy(() -> handler.scan(scan)) + assertThatThrownBy(() -> scanOrGetScanner(scan, scanType)) .isInstanceOf(UncommittedRecordException.class) .satisfies( e -> { @@ -619,14 +684,16 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentEx assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), ArgumentMatchers.any()); + verify(scanner).close(); + verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any()); + verify(snapshot, never()).putIntoScannerSet(any(Scan.class), any()); verify(snapshot, never()).verifyNoOverlap(any(), any()); } @Test public void scan_RuntimeExceptionCausedByExecutionExceptionThrownByIteratorHasNext_ShouldThrowCrudException() - throws ExecutionException { + throws ExecutionException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); @@ -643,11 +710,13 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgumentEx assertThatThrownBy(() -> handler.scan(scan)) .isInstanceOf(CrudException.class) .hasCause(executionException); + + verify(scanner).close(); } @Test public void scan_RuntimeExceptionThrownByIteratorHasNext_ShouldThrowCrudException() - throws ExecutionException { + throws ExecutionException, IOException { // Arrange Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); @@ -662,6 +731,60 @@ public void scan_RuntimeExceptionThrownByIteratorHasNext_ShouldThrowCrudExceptio assertThatThrownBy(() -> handler.scan(scan)) .isInstanceOf(CrudException.class) .hasCause(runtimeException); + + verify(scanner).close(); + } + + @Test + public void getScanner_ExecutionExceptionThrownByScannerOne_ShouldThrowCrudException() + throws ExecutionException, IOException, CrudException { + // Arrange + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + ExecutionException executionException = mock(ExecutionException.class); + when(scanner.one()).thenThrow(executionException); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actualScanner = handler.getScanner(scan); + assertThatThrownBy(actualScanner::one) + .isInstanceOf(CrudException.class) + .hasCause(executionException); + + verify(scanner).close(); + } + + @Test + public void + getScanner_ScannerNotFullyScanned_ShouldPutReadSetAndScannerSetInSnapshotAndVerifyScan() + throws ExecutionException, CrudException, IOException { + // Arrange + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + Result result1 = prepareResult(TransactionState.COMMITTED); + Result result2 = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + TransactionResult txResult1 = new TransactionResult(result1); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = handler.getScanner(scan); + Optional actualResult = actualScanner.one(); + actualScanner.close(); + + // Assert + verify(scanner).close(); + verify(snapshot).putIntoReadSet(key1, Optional.of(txResult1)); + verify(snapshot) + .putIntoScannerSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, txResult1))); + verify(snapshot).verifyNoOverlap(scan, ImmutableMap.of(key1, txResult1)); + + assertThat(actualResult) + .hasValue(new FilteredResult(txResult1, Collections.emptyList(), TABLE_METADATA, false)); } @Test @@ -1191,4 +1314,34 @@ public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws C assertThat(transactionIdCaptor.getValue()).isEqualTo(ANY_TX_ID); } + + private List scanOrGetScanner(Scan scan, ScanType scanType) throws CrudException { + if (scanType == ScanType.SCAN) { + return handler.scan(scan); + } + + try (TransactionCrudOperable.Scanner scanner = handler.getScanner(scan)) { + switch (scanType) { + case SCANNER_ONE: + List results = new ArrayList<>(); + while (true) { + Optional result = scanner.one(); + if (!result.isPresent()) { + return results; + } + results.add(result.get()); + } + case SCANNER_ALL: + return scanner.all(); + default: + throw new AssertionError(); + } + } + } + + enum ScanType { + SCAN, + SCANNER_ONE, + SCANNER_ALL + } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index 019df03c64..06b5d0aa1c 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -41,12 +41,14 @@ import com.scalar.db.io.Value; import com.scalar.db.transaction.consensuscommit.Snapshot.ReadWriteSets; import com.scalar.db.util.ScalarDbUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -98,6 +100,7 @@ public class SnapshotTest { private Map> scanSet; private Map writeSet; private Map deleteSet; + private List scannerSet; @Mock private ConsensusCommitConfig config; @Mock private PrepareMutationComposer prepareComposer; @@ -122,6 +125,7 @@ private Snapshot prepareSnapshot(Isolation isolation) { scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); + scannerSet = new ArrayList<>(); return spy( new Snapshot( @@ -133,7 +137,8 @@ private Snapshot prepareSnapshot(Isolation isolation) { getSet, scanSet, writeSet, - deleteSet)); + deleteSet, + scannerSet)); } private TransactionResult prepareResult(String txId) { @@ -1614,6 +1619,33 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions verify(storage).scan(scanWithProjectionsWithoutLimit); } + @Test + public void toSerializable_ScannerSetNotChanged_ShouldProcessWithoutExceptions() + throws ExecutionException { + // Arrange + snapshot = prepareSnapshot(Isolation.SERIALIZABLE); + Scan scan = prepareScan(); + TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_2); + TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_3); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + snapshot.putIntoScannerSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1))); + DistributedStorage storage = mock(DistributedStorage.class); + Scan scanWithProjections = + Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).build(); + Scanner scanner = mock(Scanner.class); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + when(storage.scan(scanWithProjections)).thenReturn(scanner); + + // Act Assert + assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException(); + + // Assert + verify(storage).scan(scanWithProjections); + } + @Test public void verifyNoOverlap_ScanGivenAndDeleteKeyAlreadyPresentInDeleteSet_ShouldThrowIllegalArgumentException() { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java index ba6c29ad56..880e7466c6 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java @@ -19,6 +19,8 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.TwoPhaseCommitTransaction; import com.scalar.db.api.TwoPhaseCommitTransactionManager; @@ -40,6 +42,8 @@ import com.scalar.db.io.Key; import com.scalar.db.transaction.consensuscommit.Coordinator.State; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -702,6 +706,404 @@ public void scan_ShouldScan() throws TransactionException { assertThat(actual).isEqualTo(results); } + @Test + public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() throws Exception { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThat(actual.one()).hasValue(result1); + assertThat(actual.one()).hasValue(result2); + assertThat(actual.one()).hasValue(result3); + assertThat(actual.one()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() throws Exception { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.all()) + .thenReturn(Arrays.asList(result1, result2, result3)) + .thenReturn(Collections.emptyList()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + List results = actual.all(); + assertThat(results).containsExactly(result1, result2, result3); + assertThat(actual.all()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResults() + throws Exception { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + + Iterator iterator = actual.iterator(); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result1); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result2); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result3); + assertThat(iterator.hasNext()).isFalse(); + actual.close(); + + verify(spied).begin(); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void + getScanner_CrudExceptionThrownByTransactionGetScanner_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + when(transaction.getScanner(scan)).thenThrow(CrudException.class); + + // Act Assert + assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerOne_CrudExceptionThrownByScannerOne_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.one()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerAll_CrudExceptionThrownByScannerAll_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.all()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CrudExceptionThrownByScannerClose_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + doThrow(CrudException.class).when(scanner).close(); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_PreparationConflictExceptionThrownByTransactionPrepare_ShouldRollbackTransactionAndThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(PreparationConflictException.class).when(transaction).prepare(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_ValidationConflictExceptionThrownByTransactionValidate_ShouldRollbackTransactionAndThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(ValidationConflictException.class).when(transaction).validate(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CommitConflictExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitConflictException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_UnknownTransactionStatusExceptionByTransactionCommit_ShouldThrowUnknownTransactionStatusException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); + + verify(spied).begin(); + verify(scanner).close(); + } + + @Test + public void + getScannerAndScannerClose_CommitExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + @Test public void put_ShouldPut() throws TransactionException { // Arrange diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java index bd12bbea43..0b80bf8b22 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java @@ -17,6 +17,7 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; @@ -28,6 +29,7 @@ import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.PreparationException; import com.scalar.db.exception.transaction.RollbackException; +import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.exception.transaction.UnsatisfiedConditionException; import com.scalar.db.exception.transaction.ValidationException; @@ -65,7 +67,10 @@ public class TwoPhaseConsensusCommitTest { public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); + // Arrange transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker); + + when(crud.areAllScannersClosed()).thenReturn(true); } private Get prepareGet() { @@ -166,6 +171,93 @@ public void scan_ScanForUncommittedRecordGiven_ShouldRecoverRecord() throws Crud verify(recovery).recover(scan, result); } + @Test + public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + Result result = mock(Result.class); + when(scanner.one()).thenReturn(Optional.of(result)); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = transaction.getScanner(scan); + Optional actualResult = actualScanner.one(); + + // Assert + assertThat(actualResult).hasValue(result); + verify(crud).getScanner(scan); + verify(scanner).one(); + } + + @Test + public void + getScannerAndScannerOne_UncommittedRecordExceptionThrownByScannerOne_ShouldRecoverRecord() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + TransactionResult result = mock(TransactionResult.class); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(scanner.one()).thenThrow(toThrow); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actualScanner = transaction.getScanner(scan); + assertThatThrownBy(actualScanner::one).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + + @Test + public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + when(scanner.all()).thenReturn(Arrays.asList(result1, result2)); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = transaction.getScanner(scan); + List actualResults = actualScanner.all(); + + // Assert + assertThat(actualResults).containsExactly(result1, result2); + verify(crud).getScanner(scan); + verify(scanner).all(); + } + + @Test + public void + getScannerAndScannerAll_UncommittedRecordExceptionThrownByScannerAll_ShouldRecoverRecord() + throws CrudException { + // Arrange + Scan scan = prepareScan(); + + UncommittedRecordException toThrow = mock(UncommittedRecordException.class); + TransactionResult result = mock(TransactionResult.class); + when(toThrow.getSelection()).thenReturn(scan); + when(toThrow.getResults()).thenReturn(Collections.singletonList(result)); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(scanner.all()).thenThrow(toThrow); + when(crud.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actualScanner = transaction.getScanner(scan); + assertThatThrownBy(actualScanner::all).isInstanceOf(UncommittedRecordException.class); + + verify(recovery).recover(scan, result); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -673,6 +765,15 @@ public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot() assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationException.class); } + @Test + public void prepare_ScannerNotClosed_ShouldThrowIllegalStateException() { + // Arrange + when(crud.areAllScannersClosed()).thenReturn(false); + + // Act Assert + assertThatThrownBy(() -> transaction.prepare()).isInstanceOf(IllegalStateException.class); + } + @Test public void validate_ProcessedCrudGiven_ShouldPerformValidationWithSnapshot() throws ValidationException, PreparationException { @@ -735,8 +836,7 @@ public void commit_SerializableUsedAndPreparedState_ShouldThrowIllegalStateExcep } @Test - public void rollback_ShouldAbortStateAndRollbackRecords() - throws RollbackException, UnknownTransactionStatusException, PreparationException { + public void rollback_ShouldAbortStateAndRollbackRecords() throws TransactionException { // Arrange transaction.prepare(); when(crud.getSnapshot()).thenReturn(snapshot); @@ -745,13 +845,14 @@ public void rollback_ShouldAbortStateAndRollbackRecords() transaction.rollback(); // Assert + verify(crud).closeScanners(); verify(commit).abortState(snapshot.getId()); verify(commit).rollbackRecords(snapshot); } @Test public void rollback_CalledAfterPrepareFails_ShouldAbortStateAndRollbackRecords() - throws PreparationException, UnknownTransactionStatusException, RollbackException { + throws TransactionException { // Arrange when(crud.getSnapshot()).thenReturn(snapshot); doThrow(PreparationException.class).when(commit).prepare(snapshot); @@ -761,14 +862,14 @@ public void rollback_CalledAfterPrepareFails_ShouldAbortStateAndRollbackRecords( transaction.rollback(); // Assert + verify(crud).closeScanners(); verify(commit).abortState(snapshot.getId()); verify(commit).rollbackRecords(snapshot); } @Test public void rollback_CalledAfterCommitFails_ShouldNeverAbortStateAndRollbackRecords() - throws CommitException, UnknownTransactionStatusException, RollbackException, - PreparationException { + throws TransactionException { // Arrange transaction.prepare(); when(crud.getSnapshot()).thenReturn(snapshot); @@ -779,6 +880,7 @@ public void rollback_CalledAfterCommitFails_ShouldNeverAbortStateAndRollbackReco transaction.rollback(); // Assert + verify(crud).closeScanners(); verify(commit, never()).abortState(snapshot.getId()); verify(commit, never()).rollbackRecords(snapshot); } @@ -786,7 +888,7 @@ public void rollback_CalledAfterCommitFails_ShouldNeverAbortStateAndRollbackReco @Test public void rollback_UnknownTransactionStatusExceptionThrownByAbortState_ShouldThrowRollbackException() - throws UnknownTransactionStatusException, PreparationException { + throws TransactionException { // Arrange transaction.prepare(); when(crud.getSnapshot()).thenReturn(snapshot); @@ -795,12 +897,13 @@ public void rollback_CalledAfterCommitFails_ShouldNeverAbortStateAndRollbackReco // Act Assert assertThatThrownBy(transaction::rollback).isInstanceOf(RollbackException.class); + verify(crud).closeScanners(); verify(commit, never()).rollbackRecords(snapshot); } @Test public void rollback_CommittedStateReturnedByAbortState_ShouldThrowRollbackException() - throws UnknownTransactionStatusException, PreparationException { + throws TransactionException { // Arrange transaction.prepare(); when(crud.getSnapshot()).thenReturn(snapshot); @@ -809,6 +912,7 @@ public void rollback_CommittedStateReturnedByAbortState_ShouldThrowRollbackExcep // Act Assert assertThatThrownBy(transaction::rollback).isInstanceOf(RollbackException.class); + verify(crud).closeScanners(); verify(commit, never()).rollbackRecords(snapshot); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java index 9854894ac4..bf5abaae05 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java @@ -15,7 +15,6 @@ import com.scalar.db.io.Key; import java.util.Optional; import java.util.Properties; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public abstract class ConsensusCommitIntegrationTestBase @@ -930,14 +929,4 @@ public void deleteAndDelete_forSameRecord_shouldWorkCorrectly() throws Transacti Optional optResult = get(prepareGet(0, 0)); assertThat(optResult).isNotPresent(); } - - @Disabled("Implement later") - @Override - @Test - public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} - - @Disabled("Implement later") - @Override - @Test - public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index d552d704e3..e4a31e9673 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -32,6 +32,7 @@ import com.scalar.db.api.ScanAll; import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; import com.scalar.db.config.DatabaseConfig; @@ -4184,6 +4185,220 @@ public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() assertThatCode(transaction::commit).doesNotThrowAnyException(); } + @Test + public void getScanner_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_FirstInsertedRecordByAnotherTransaction_WithSerializable_ShouldNotThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + getScanner_RecordInsertedByAnotherTransaction_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 0) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + @Test public void get_GetWithIndexGiven_NoRecordsInIndexRange_WithSerializable_ShouldNotThrowAnyException() diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitIntegrationTestBase.java index d57c31302c..781087ed79 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitIntegrationTestBase.java @@ -2,8 +2,6 @@ import com.scalar.db.api.TwoPhaseCommitTransactionIntegrationTestBase; import java.util.Properties; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; public abstract class TwoPhaseConsensusCommitIntegrationTestBase extends TwoPhaseCommitTransactionIntegrationTestBase { @@ -40,14 +38,4 @@ protected final Properties getProperties2(String testName) { protected Properties getProps2(String testName) { return getProps1(testName); } - - @Disabled("Implement later") - @Override - @Test - public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} - - @Disabled("Implement later") - @Override - @Test - public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} }