Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/static/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,10 @@ components:
properties:
instant:
$ref: '#/components/schemas/Instant'
fromSnapshot:
type: integer
format: int64
nullable: true
Instant:
anyOf:
- $ref: '#/components/schemas/SnapshotInstant'
Expand Down
18 changes: 17 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,23 @@ public boolean commitSnapshot(
* this table
*/
public void rollbackTo(Identifier identifier, Instant instant) {
RollbackTableRequest request = new RollbackTableRequest(instant);
rollbackTo(identifier, instant, null);
}

/**
* Rollback instant for table.
*
* @param identifier database name and table name.
* @param instant instant to rollback
* @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this
* snapshot.
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table or the snapshot
* or the tag not exists
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
* this table
*/
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) {
RollbackTableRequest request = new RollbackTableRequest(instant, fromSnapshot);
client.post(
resourcePaths.rollbackTable(
identifier.getDatabaseName(), identifier.getObjectName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,42 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

/** Request for rollback table. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class RollbackTableRequest implements RESTRequest {

private static final String FIELD_INSTANT = "instant";
private static final String FIELD_FROM_SNAPSHOT = "fromSnapshot";

@JsonProperty(FIELD_INSTANT)
private final Instant instant;

@JsonProperty(FIELD_FROM_SNAPSHOT)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Long fromSnapshot;

@JsonCreator
public RollbackTableRequest(@JsonProperty(FIELD_INSTANT) Instant instant) {
public RollbackTableRequest(
@JsonProperty(FIELD_INSTANT) Instant instant,
@JsonProperty(FIELD_FROM_SNAPSHOT) @Nullable Long fromSnapshot) {
this.instant = instant;
this.fromSnapshot = fromSnapshot;
}

@JsonGetter(FIELD_INSTANT)
public Instant getInstant() {
return instant;
}

@JsonGetter(FIELD_FROM_SNAPSHOT)
@Nullable
public Long getFromSnapshot() {
return fromSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public PagedList<Snapshot> listSnapshotsPaged(
}

@Override
public void rollbackTo(Identifier identifier, Instant instant)
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
throws Catalog.TableNotExistException {
throw new UnsupportedOperationException();
}
Expand Down
19 changes: 18 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,24 @@ PagedList<Snapshot> listSnapshotsPaged(
* @throws UnsupportedOperationException if the catalog does not {@link
* #supportsVersionManagement()}
*/
void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException;
default void rollbackTo(Identifier identifier, Instant instant)
throws Catalog.TableNotExistException {
rollbackTo(identifier, instant, null);
}

/**
* rollback table by the given {@link Identifier} and instant.
*
* @param identifier path of the table
* @param instant like snapshotId or tagName
* @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this
* snapshot.
* @throws Catalog.TableNotExistException if the table does not exist
* @throws UnsupportedOperationException if the catalog does not {@link
* #supportsVersionManagement()}
*/
void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
throws Catalog.TableNotExistException;

/**
* Create a new branch for this table. By default, an empty branch will be created using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ public PagedList<Snapshot> listSnapshotsPaged(
}

@Override
public void rollbackTo(Identifier identifier, Instant instant)
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
throws Catalog.TableNotExistException {
wrapped.rollbackTo(identifier, instant);
wrapped.rollbackTo(identifier, instant, fromSnapshot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ public boolean commitSnapshot(
}

@Override
public void rollbackTo(Identifier identifier, Instant instant)
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
throws Catalog.TableNotExistException {
try {
api.rollbackTo(identifier, instant);
api.rollbackTo(identifier, instant, fromSnapshot);
} catch (NoSuchResourceException e) {
if (StringUtils.equals(e.resourceType(), ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ public static GetTableTokenResponse getTableCredentialsResponse() {
}

public static RollbackTableRequest rollbackTableRequestBySnapshot(long snapshotId) {
return new RollbackTableRequest(Instant.snapshot(snapshotId));
return new RollbackTableRequest(Instant.snapshot(snapshotId), null);
}

public static RollbackTableRequest rollbackTableRequestByTag(String tagName) {
return new RollbackTableRequest(Instant.tag(tagName));
return new RollbackTableRequest(Instant.tag(tagName), null);
}

public static AlterViewRequest alterViewRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -478,7 +480,8 @@ public MockResponse dispatch(RecordedRequest request) {
long snapshotId =
((Instant.SnapshotInstant) requestBody.getInstant())
.getSnapshotId();
return rollbackTableByIdHandle(identifier, snapshotId);
return rollbackTableByIdHandle(
identifier, snapshotId, requestBody.getFromSnapshot());
} else if (requestBody.getInstant() instanceof Instant.TagInstant) {
String tagName =
((Instant.TagInstant) requestBody.getInstant())
Expand Down Expand Up @@ -844,26 +847,35 @@ private MockResponse commitTableHandle(Identifier identifier, String data) throw
requestBody.getStatistics());
}

private MockResponse rollbackTableByIdHandle(Identifier identifier, long snapshotId)
throws Exception {
private MockResponse rollbackTableByIdHandle(
Identifier identifier, long snapshotId, @Nullable Long fromSnapshot) throws Exception {
FileStoreTable table = getFileTable(identifier);
String identifierWithSnapshotId = geTableFullNameWithSnapshotId(identifier, snapshotId);
if (tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
table =
table.copy(
Collections.singletonMap(
SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
long latestSnapshotId = table.snapshotManager().latestSnapshotId();
table.rollbackTo(snapshotId);
cleanSnapshot(identifier, snapshotId, latestSnapshotId);
tableLatestSnapshotStore.put(
identifier.getFullName(),
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
return new MockResponse().setResponseCode(200);
TableSnapshot toSnapshot = tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId);
if (toSnapshot == null) {
return mockResponse(
new ErrorResponse(
ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404),
404);
}
return mockResponse(
new ErrorResponse(ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404),
404);
long latestSnapshotId = table.snapshotManager().latestSnapshotId();
if (fromSnapshot != null && fromSnapshot != latestSnapshotId) {
return mockResponse(
new ErrorResponse(
null,
null,
String.format(
"Latest snapshot %s is not %s", latestSnapshotId, fromSnapshot),
500),
500);
}
table =
table.copy(
Collections.singletonMap(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
table.rollbackTo(snapshotId);
cleanSnapshot(identifier, snapshotId, latestSnapshotId);
tableLatestSnapshotStore.put(identifier.getFullName(), toSnapshot);
return new MockResponse().setResponseCode(200);
}

private MockResponse rollbackTableByTagNameHandle(Identifier identifier, String tagName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.object.ObjectTable;
Expand Down Expand Up @@ -1751,24 +1752,32 @@ public void testTableRollback() throws Exception {
GenericRow record = GenericRow.of(i);
write.write(record);
commit.commit(i, write.prepareCommit(false, i));
table.createTag("tag-" + i);
table.createTag("tag-" + (i + 1));
}
write.close();
commit.close();

// rollback to snapshot 4
long rollbackToSnapshotId = 4;
table.rollbackTo(rollbackToSnapshotId);
assertThat(table.snapshotManager().snapshot(rollbackToSnapshotId))
.isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot());
assertThat(table.tagManager().tagExists("tag-" + (rollbackToSnapshotId + 2))).isFalse();
assertThat(table.snapshotManager().snapshotExists(rollbackToSnapshotId + 1)).isFalse();

assertThrows(
IllegalArgumentException.class, () -> table.rollbackTo(rollbackToSnapshotId + 1));

// rollback to snapshot 3
String rollbackToTagName = "tag-" + (rollbackToSnapshotId - 1);
table.rollbackTo(rollbackToTagName);
Snapshot tagSnapshot = table.tagManager().getOrThrow(rollbackToTagName).trimToSnapshot();
assertThat(tagSnapshot).isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot());

// rollback to snapshot 2 from snapshot
assertThatThrownBy(() -> catalog.rollbackTo(identifier, Instant.snapshot(2L), 4L))
.hasMessageContaining("Latest snapshot 3 is not 4");
catalog.rollbackTo(identifier, Instant.snapshot(2L), 3L);
assertThat(table.latestSnapshot().get().id()).isEqualTo(2);
}

@Test
Expand Down
Loading