Skip to content

Commit 76a2f00

Browse files
committed
add clean for committer
1 parent 38621dc commit 76a2f00

File tree

5 files changed

+20
-16
lines changed

5 files changed

+20
-16
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,7 @@ public void discard(FileIO fileIO) throws IOException {
8383
public Path targetFilePath() {
8484
return new Path(objectName);
8585
}
86+
87+
@Override
88+
public void clean(FileIO fileIO) throws IOException {}
8689
}

paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*/
3030
@Public
3131
public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream {
32-
public static final String TEMP_DIR_NAME = "_temporary";
32+
private static final String TEMP_DIR_NAME = "_temporary";
3333

3434
private final Path targetPath;
3535
private final Path tempPath;
@@ -133,5 +133,13 @@ public void discard(FileIO fileIO) throws IOException {
133133
public Path targetFilePath() {
134134
return targetPath;
135135
}
136+
137+
@Override
138+
public void clean(FileIO fileIO) throws IOException {
139+
Path path = tempPath.getParent();
140+
if (fileIO.exists(path)) {
141+
fileIO.deleteQuietly(path);
142+
}
143+
}
136144
}
137145
}

paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,7 @@ public interface Committer extends Serializable {
5353
void discard(FileIO fileIO) throws IOException;
5454

5555
Path targetFilePath();
56+
57+
void clean(FileIO fileIO) throws IOException;
5658
}
5759
}

paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ public void discard(FileIO fileIO) throws IOException {
322322
public Path targetFilePath() {
323323
return new Path(objectName);
324324
}
325+
326+
@Override
327+
public void clean(FileIO fileIO) throws IOException {}
325328
}
326329

327330
private static final class TestPart {

paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.paimon.fs.FileIO;
2222
import org.apache.paimon.fs.FileStatus;
2323
import org.apache.paimon.fs.Path;
24-
import org.apache.paimon.fs.RenamingTwoPhaseOutputStream;
2524
import org.apache.paimon.fs.TwoPhaseOutputStream;
2625
import org.apache.paimon.metrics.MetricRegistry;
2726
import org.apache.paimon.stats.Statistics;
@@ -102,21 +101,10 @@ public void commit(List<CommitMessage> commitMessages) {
102101
for (TwoPhaseOutputStream.Committer committer : committers) {
103102
committer.commit(this.fileIO);
104103
}
105-
if (committers.stream()
106-
.filter(c -> c instanceof RenamingTwoPhaseOutputStream.TempFileCommitter)
107-
.findAny()
108-
.isPresent()) {
109-
if (partitionPaths.size() > 1) {
110-
for (Path partitionPath : partitionPaths) {
111-
Path tempPath =
112-
new Path(partitionPath, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME);
113-
fileIO.deleteQuietly(tempPath);
114-
}
115-
} else {
116-
Path tempPath = new Path(location, RenamingTwoPhaseOutputStream.TEMP_DIR_NAME);
117-
fileIO.deleteQuietly(tempPath);
118-
}
104+
for (TwoPhaseOutputStream.Committer committer : committers) {
105+
committer.clean(this.fileIO);
119106
}
107+
120108
} catch (Exception e) {
121109
this.abort(commitMessages);
122110
throw new RuntimeException(e);

0 commit comments

Comments
 (0)