Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,32 @@ private ServiceStateHashes getServiceStateHashes(ServiceWrapper service, Snapsho
.build();
}

/**
* Performs the before commit operation for all services in the runtime.
*
* @param fork a fork allowing the runtime and the service to modify the database state.
* Must allow checkpoints and rollbacks.
*/
public void beforeCommit(Fork fork) {
synchronized (lock) {
try {
for (ServiceWrapper service : services.values()) {
fork.createCheckpoint();
try {
service.beforeCommit(fork);
} catch (Exception e) {
logger.error("Service {} threw exception in beforeCommit. Any changes are rolled-back",
service.getName(), e);
fork.rollback();
}
}
} catch (Exception e) {
logger.error("Unexpected exception in beforeCommit", e);
throw e;
}
}
}

/**
* Notifies the services in the runtime of the block commit event.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ byte[] getStateHashes(long snapshotHandle) throws CloseFailuresException {
}
}

/**
* Performs the before commit operation for services in this runtime.
*
* @param forkHandle a handle to the native fork object, which must support checkpoints
* and rollbacks
* @throws CloseFailuresException if there was a failure in destroying some native peers
* @see ServiceRuntime#beforeCommit(Fork)
*/
void beforeCommit(long forkHandle) throws CloseFailuresException {
try (Cleaner cleaner = new Cleaner("beforeCommit")) {
Fork fork = viewFactory.createFork(forkHandle, cleaner);
serviceRuntime.beforeCommit(fork);
} catch (CloseFailuresException e) {
handleCloseFailure(e);
}
}

/**
* Notifies the runtime of the block commit event.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ List<HashCode> getStateHashes(Snapshot snapshot) {
return service.getStateHashes(snapshot);
}

void beforeCommit(Fork fork) {
service.beforeCommit(fork);
}

void afterCommit(BlockCommittedEvent event) {
service.afterCommit(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ default void initialize(Fork fork, Configuration configuration) {
*/
void createPublicApiHandlers(Node node, Router router);

/**
* Handles the changes made by all transactions included in the upcoming block.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alekseysidorov would you check please if the user-facing documentation is accurate?

* This handler is an optional callback method invoked by the blockchain after all transactions
* in a block are executed, but before it is committed. The service can modify its state
* in this handler, therefore, implementations must be deterministic and use only the current
* database state as their input.
*
* <p>This method is invoked synchronously from the thread that commits the block, therefore,
* implementations of this method must not perform any blocking or long-running operations.
*
* <p>Any exceptions in this method will revert any changes made to the database by it,
* but will not affect the processing of this block.
*/
default void beforeCommit(Fork fork) {}

/**
* Handles read-only block commit event. This handler is an optional callback method which is
* invoked by the blockchain after each block commit. For example, a service can create one or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,20 @@ NativeHandle intoPatch() {
}

/**
* Creates in-memory checkpoint that can be used to rollback changes.
* Creates in-memory checkpoint of the current state of this Fork. A checkpoint allows to restore
* the state of the Fork by reverting the changes made since the last checkpoint operation with
* {@link #rollback()}. The changes made <em>before</em> the last checkpoint cannot be reverted,
* because each new checkpoint replaces the previous: checkpoints are not stacked.
*
* <p>Creating a checkpoint will invalidate all collections that were instantiated with this fork.
*
* <p>This operation is not intended to be used by services.
*/
void createCheckpoint() {
public void createCheckpoint() {
// As stacked (nested) checkpoints are not supported, this operation must not be used by
// the client code, because in case of an exception it will make the framework
// unable to revert the changes made by the service before the service created
// a checkpoint: ECR-3611
checkState(nativeCanRollback(getNativeHandle()),
"This fork does not support checkpoints");

Expand All @@ -164,12 +175,15 @@ void createCheckpoint() {
}

/**
* Rollbacks changes to the latest checkpoint. Affects only changes made with
* this particular Fork instance.
* Rollbacks changes to the latest checkpoint. If no checkpoints were created, rollbacks all
* changes made by this fork. Rollback affects only changes made with this particular
* Fork instance.
*
* <p>Rollback will invalidate all collections that were created with this fork.
*
* <p>If no checkpoints was created, rollbacks all changes made by this fork.
* <p>This operation is not intended to be used by services.
*/
void rollback() {
public void rollback() {
checkState(nativeCanRollback(getNativeHandle()),
"This fork does not support rollbacks");

Expand Down Expand Up @@ -221,6 +235,15 @@ private void replaceIndexCleaner() {
*/
private static native long nativeIntoPatch(long nativeHandle);

/**
* Returns true if creating checkpoints and performing rollbacks is
* possible with this particular Fork instance.
*
* @see #createCheckpoint()
* @see #rollback()
*/
private static native boolean nativeCanRollback(long nativeHandle);

/**
* Creates in-memory checkpoint that can be used to rollback changes.
*/
Expand All @@ -231,13 +254,4 @@ private void replaceIndexCleaner() {
* this particular Fork instance.
*/
private static native void nativeRollback(long nativeHandle);

/**
* Returns true if creating checkpoints and performing rollbacks is
* possible with this particular Fork instance.
*
* @see #createCheckpoint()
* @see #rollback()
*/
private static native boolean nativeCanRollback(long nativeHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -117,6 +118,18 @@ void initializeService() throws CloseFailuresException {
verify(serviceRuntime).initializeService(serviceId, fork, configuration);
}

@Test
void beforeCommit() throws CloseFailuresException {
long forkHandle = 0x110b;
Fork fork = mock(Fork.class);
when(viewFactory.createFork(eq(forkHandle), any(Cleaner.class)))
.thenReturn(fork);

serviceRuntimeAdapter.beforeCommit(forkHandle);

verify(serviceRuntime).beforeCommit(fork);
}

@Test
void afterCommit_ValidatorNode() throws CloseFailuresException {
when(viewFactory.createSnapshot(eq(SNAPSHOT_HANDLE), any(Cleaner.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -50,6 +51,7 @@
import com.exonum.binding.core.transaction.TransactionContext;
import com.exonum.binding.core.transport.Server;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.vertx.ext.web.Router;
import java.nio.file.Path;
Expand Down Expand Up @@ -373,6 +375,34 @@ void getStateHashesSingleService() throws CloseFailuresException {
}
}

@Test
void beforeCommitSingleService() throws CloseFailuresException {
try (Database database = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner()) {
Fork fork = database.createFork(cleaner);

serviceRuntime.beforeCommit(fork);

verify(serviceWrapper).beforeCommit(fork);
}
}

@Test
void beforeCommitThrowingServiceChangesAreRolledBack() throws CloseFailuresException {
try (Database database = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner()) {
Fork fork = spy(database.createFork(cleaner));
doThrow(IllegalStateException.class).when(serviceWrapper).beforeCommit(fork);

serviceRuntime.beforeCommit(fork);

InOrder inOrder = Mockito.inOrder(fork, serviceWrapper);
inOrder.verify(fork).createCheckpoint();
inOrder.verify(serviceWrapper).beforeCommit(fork);
inOrder.verify(fork).rollback();
}
}

@Test
void afterCommitSingleService() {
BlockCommittedEvent event = mock(BlockCommittedEvent.class);
Expand Down Expand Up @@ -485,6 +515,37 @@ void getStateHashesMultipleServices() throws CloseFailuresException {
}
}

@Test
void beforeCommitMultipleServicesWithFirstThrowing() throws CloseFailuresException {
try (Database database = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner()) {
Collection<ServiceWrapper> services = SERVICES.values();

// Setup the first service to throw exception in its before commit handler
ServiceWrapper service1 = services
.iterator()
.next();
Fork fork = spy(database.createFork(cleaner));
doThrow(RuntimeException.class).when(service1).beforeCommit(fork);

// Notify the runtime before the block commit
serviceRuntime.beforeCommit(fork);

// Verify that each service got the notifications, i.e., the first service
// throwing an exception has not disrupted the notification process
Object[] mocks = Lists.asList(fork, services.toArray()).toArray();
InOrder inOrder = Mockito.inOrder(mocks);
for (ServiceWrapper service : services) {
inOrder.verify(fork).createCheckpoint();
inOrder.verify(service).beforeCommit(fork);
// Verify the fork was rolled-back once after the throwing service
if (service.equals(service1)) {
inOrder.verify(fork).rollback();
}
}
}
}

@Test
void afterCommitMultipleServicesWithFirstThrowing() {
Collection<ServiceWrapper> services = SERVICES.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.exonum.binding.core.storage.indices.ListIndexProxy;
import com.exonum.binding.test.RequiresNativeLibrary;
import java.util.Iterator;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

@RequiresNativeLibrary
Expand Down Expand Up @@ -147,6 +148,56 @@ void rollbacksChangesMadeSinceLastCheckpoint() throws Exception {
}
}

@Test
@DisplayName("rollback shall work when multiple checkpoints are created, reverting to the"
+ "state as of the last checkpoint")
void rollbacksToTheLastCheckpointWhenMultipleAreCreated() throws Exception {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("parent")) {
Fork fork = db.createFork(cleaner);

// Create a list with a single element
String listName = "list";

// Modify and create the first checkpoint
{
ListIndex<String> list = newList(listName, fork);
list.add("s1");
fork.createCheckpoint();
}

// Modify and create the second checkpoint
{
ListIndex<String> list = newList(listName, fork);
list.add("s2");
fork.createCheckpoint();
}

// Modify the list
{
ListIndex<String> list = newList(listName, fork);
list.add("s3");
assertThat(list).containsExactly("s1", "s2", "s3");
}

// Rollback the changes: must restore the state as of the second checkpoint
fork.rollback();
{
ListIndex<String> list = newList(listName, fork);
assertThat(list).containsExactly("s1", "s2");
}

// Rollback again: as no nested (stacked) checkpoints are supported,
// the first checkpoint is no longer available and any rollback will revert
// the state to the last (second) checkpoint
fork.rollback();
{
ListIndex<String> list = newList(listName, fork);
assertThat(list).containsExactly("s1", "s2");
}
}
}

@Test
void rollbackDoesNotAffectDatabase() throws Exception {
try (TemporaryDb db = TemporaryDb.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.exonum.binding.core.storage.database;

import static org.assertj.core.api.Java6Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.exonum.binding.common.serialization.StandardSerializers;
Expand Down