Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,18 @@ public enum CoreError implements ScalarDbError {
Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""),
DATA_LOADER_CONFIG_FILE_PATH_BLANK(
Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""),
CONSENSUS_COMMIT_SCANNER_NOT_CLOSED(
Category.USER_ERROR,
"0205",
"Some scanners were not closed. All scanners must be closed before committing the transaction.",
"",
""),
TWO_PHASE_CONSENSUS_COMMIT_SCANNER_NOT_CLOSED(
Category.USER_ERROR,
"0206",
"Some scanners were not closed. All scanners must be closed before preparing the transaction.",
"",
""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.scalar.db.exception.transaction.UnsatisfiedConditionException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,7 +100,41 @@ public List<Result> scan(Scan scan) throws CrudException {

@Override
public Scanner getScanner(Scan scan) throws CrudException {
throw new UnsupportedOperationException("Implement later");
scan = copyAndSetTargetToIfNot(scan);
Scanner scanner = crud.getScanner(scan);

return new Scanner() {
@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Is there any reason not to close scanner here while ConsensusCommitManager.getScanner() closes that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is probably ConsensusCommitManager commits after one scan, whereas ConsensusCommit commits after doing several operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Oops, sorry I thought here was in catch (CrudException e) { ... } block).

I just thought we can immediately close the failed scanner as well as ConsensusCommitManager.getScanner(), for consistency and a bit faster resource release. It's just out of curiosity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry for the lack of explanation.

This anonymous Scanner class wraps the scanner instance returned by CrudHandler here:
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-ec7fb790c921b21b0690f23d557c721585b0c83ef32b729e7dada7009d6142d0R104

The CrudHandler.getScanner() method returns an instance of ConsensusCommitStorageScanner if the scan result is not present in the snapshot.

The ConsensusCommitStorageScanner.one() method immediately closes the underlying scanner when a failure occurs:
https://github.com/scalar-labs/scalardb/pull/2711/files#diff-acc93f1366b03168017001646c7217f7dd1f3c7cbe8a41b6a9a8484dc5ef0905R424-R452

So, the behavior is consistent with ConsensusCommitManager.getScanner().

By the way, the anonymous Scanner class simply executes lazy recovery when an UncommittedRecordException is caught.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! Sounds good 💯

}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

@Override
public void close() throws CrudException {
scanner.close();
}

@Nonnull
@Override
public Iterator<Result> iterator() {
return scanner.iterator();
}
};
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand Down Expand Up @@ -213,6 +249,10 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {

@Override
public void commit() throws CommitException, UnknownTransactionStatusException {
if (!crud.areAllScannersClosed()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if all the scanners are closed before committing a transaction.

throw new IllegalStateException(CoreError.CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage());
}

// Execute implicit pre-read
try {
crud.readIfImplicitPreReadEnabled();
Expand All @@ -234,6 +274,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {

@Override
public void rollback() {
try {
crud.closeScanners();
} catch (CrudException e) {
logger.warn("Failed to close the scanner", e);
}

if (groupCommitter != null) {
groupCommitter.remove(crud.getSnapshot().getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TransactionCrudOperable;
import com.scalar.db.api.TransactionState;
import com.scalar.db.api.Update;
import com.scalar.db.api.Upsert;
import com.scalar.db.common.AbstractDistributedTransactionManager;
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CrudConflictException;
Expand All @@ -34,6 +36,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
Expand Down Expand Up @@ -231,9 +234,86 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat

@Override
public Scanner getScanner(Scan scan) throws CrudException {
throw new UnsupportedOperationException("Implement later");
DistributedTransaction transaction = begin();

TransactionCrudOperable.Scanner scanner;
try {
scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan));
} catch (CrudException e) {
rollbackTransaction(transaction);
throw e;
}

return new AbstractTransactionManagerCrudOperableScanner() {

private final AtomicBoolean closed = new AtomicBoolean();

@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (CrudException e) {
closed.set(true);

try {
scanner.close();
} catch (CrudException ex) {
e.addSuppressed(ex);
}

rollbackTransaction(transaction);
throw e;
}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (CrudException e) {
closed.set(true);

try {
scanner.close();
} catch (CrudException ex) {
e.addSuppressed(ex);
}

rollbackTransaction(transaction);
throw e;
}
}

@Override
public void close() throws CrudException, UnknownTransactionStatusException {
if (closed.get()) {
return;
}
closed.set(true);

try {
scanner.close();
} catch (CrudException e) {
rollbackTransaction(transaction);
throw e;
}

try {
transaction.commit();
} catch (CommitConflictException e) {
rollbackTransaction(transaction);
throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (UnknownTransactionStatusException e) {
throw e;
} catch (TransactionException e) {
rollbackTransaction(transaction);
throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
}
};
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void put(Put put) throws CrudException, UnknownTransactionStatusException {
Expand All @@ -244,6 +324,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException
});
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void put(List<Put> puts) throws CrudException, UnknownTransactionStatusException {
Expand Down Expand Up @@ -290,6 +371,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus
});
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@Deprecated
@Override
public void delete(List<Delete> deletes) throws CrudException, UnknownTransactionStatusException {
Expand Down
Loading