diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 033c06d9e8a0..e5d4cd2ff20c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -40,8 +40,17 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.operation.commit.CommitChanges; +import org.apache.paimon.operation.commit.CommitChangesProvider; +import org.apache.paimon.operation.commit.CommitCleaner; +import org.apache.paimon.operation.commit.CommitKindProvider; +import org.apache.paimon.operation.commit.CommitResult; +import org.apache.paimon.operation.commit.CommitScanner; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck; +import org.apache.paimon.operation.commit.ManifestEntryChanges; +import org.apache.paimon.operation.commit.RetryCommitResult; +import org.apache.paimon.operation.commit.SuccessCommitResult; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; import org.apache.paimon.options.MemorySize; @@ -56,7 +65,6 @@ import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; -import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.FileStorePathFactory; @@ -93,6 +101,7 @@ import static org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked; import static org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck; import static org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck; +import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -134,7 +143,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final ManifestFile manifestFile; private final ManifestList manifestList; private final IndexManifestFile indexManifestFile; - private final FileStoreScan scan; + private final CommitScanner scanner; private final int numBucket; private final MemorySize manifestTargetSize; private final MemorySize manifestFullCompactionSize; @@ -154,6 +163,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final boolean rowTrackingEnabled; private final boolean discardDuplicateFiles; private final ConflictDetection conflictDetection; + private final CommitCleaner commitCleaner; private boolean ignoreEmptyCommit; private CommitMetrics commitMetrics; @@ -205,11 +215,7 @@ public FileStoreCommitImpl( this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); - this.scan = scan; - // Stats in DELETE Manifest Entries is useless - if (options.manifestDeleteFileDropStats()) { - this.scan.dropStats(); - } + this.scanner = new CommitScanner(scan, indexManifestFile, options); this.numBucket = numBucket; this.manifestTargetSize = manifestTargetSize; this.manifestFullCompactionSize = manifestFullCompactionSize; @@ -229,7 +235,6 @@ public FileStoreCommitImpl( partitionType, partitionType.getFieldNames().toArray(new String[0]), options.legacyPartitionName()); - this.ignoreEmptyCommit = true; this.commitMetrics = null; this.statsFileHandler = statsFileHandler; @@ -237,6 +242,7 @@ public FileStoreCommitImpl( this.rowTrackingEnabled = rowTrackingEnabled; this.discardDuplicateFiles = discardDuplicateFiles; this.conflictDetection = conflictDetection; + this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @Override @@ -305,26 +311,14 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { Long safeLatestSnapshotId = null; List baseEntries = new ArrayList<>(); - List appendTableFiles = new ArrayList<>(); - List appendChangelog = new ArrayList<>(); - List appendIndexFiles = new ArrayList<>(); - List compactTableFiles = new ArrayList<>(); - List compactChangelog = new ArrayList<>(); - List compactIndexFiles = new ArrayList<>(); - collectChanges( - committable.fileCommittables(), - appendTableFiles, - appendChangelog, - appendIndexFiles, - compactTableFiles, - compactChangelog, - compactIndexFiles); + ManifestEntryChanges changes = collectChanges(committable.fileCommittables()); try { - List appendSimpleEntries = SimpleFileEntry.from(appendTableFiles); + List appendSimpleEntries = + SimpleFileEntry.from(changes.appendTableFiles); if (!ignoreEmptyCommit - || !appendTableFiles.isEmpty() - || !appendChangelog.isEmpty() - || !appendIndexFiles.isEmpty()) { + || !changes.appendTableFiles.isEmpty() + || !changes.appendChangelog.isEmpty() + || !changes.appendIndexFiles.isEmpty()) { // Optimization for common path. // Step 1: // Read manifest entries from changed partitions here and check for conflicts. @@ -335,7 +329,8 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { latestSnapshot = snapshotManager.latestSnapshot(); CommitKind commitKind = CommitKind.APPEND; ConflictCheck conflictCheck = noConflictCheck(); - if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, appendIndexFiles)) { + if (containsFileDeletionOrDeletionVectors( + appendSimpleEntries, changes.appendIndexFiles)) { commitKind = CommitKind.OVERWRITE; conflictCheck = mustConflictCheck(); } else if (latestSnapshot != null && appendCommitCheckConflict) { @@ -351,51 +346,54 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { // it is possible that some partitions only have compact changes, // so we need to contain all changes baseEntries.addAll( - readAllEntriesFromChangedPartitions( + scanner.readAllEntriesFromChangedPartitions( latestSnapshot, changedPartitions( - appendTableFiles, - compactTableFiles, - appendIndexFiles))); + changes.appendTableFiles, + changes.compactTableFiles, + changes.appendIndexFiles))); if (discardDuplicate) { Set baseIdentifiers = baseEntries.stream() .map(FileEntry::identifier) .collect(Collectors.toSet()); - appendTableFiles = - appendTableFiles.stream() + changes.appendTableFiles = + changes.appendTableFiles.stream() .filter( entry -> !baseIdentifiers.contains( entry.identifier())) .collect(Collectors.toList()); - appendSimpleEntries = SimpleFileEntry.from(appendTableFiles); + appendSimpleEntries = SimpleFileEntry.from(changes.appendTableFiles); } conflictDetection.checkNoConflictsOrFail( latestSnapshot, baseEntries, appendSimpleEntries, - appendIndexFiles, + changes.appendIndexFiles, commitKind); safeLatestSnapshotId = latestSnapshot.id(); } attempts += tryCommit( - provider(appendTableFiles, appendChangelog, appendIndexFiles), + CommitChangesProvider.provider( + changes.appendTableFiles, + changes.appendChangelog, + changes.appendIndexFiles), committable.identifier(), committable.watermark(), committable.logOffsets(), committable.properties(), - provider(commitKind), + CommitKindProvider.provider(commitKind), conflictCheck, null); generatedSnapshot += 1; } - if (!compactTableFiles.isEmpty() - || !compactChangelog.isEmpty() - || !compactIndexFiles.isEmpty()) { + if (!changes.compactTableFiles.isEmpty() + || !changes.compactChangelog.isEmpty() + || !changes.compactIndexFiles.isEmpty()) { // Optimization for common path. // Step 2: // Add appendChanges to the manifest entries read above and check for conflicts. @@ -408,8 +406,8 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { conflictDetection.checkNoConflictsOrFail( latestSnapshot, baseEntries, - SimpleFileEntry.from(compactTableFiles), - compactIndexFiles, + SimpleFileEntry.from(changes.compactTableFiles), + changes.compactIndexFiles, CommitKind.COMPACT); // assume this compact commit follows just after the append commit created above safeLatestSnapshotId += 1; @@ -417,12 +415,15 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { attempts += tryCommit( - provider(compactTableFiles, compactChangelog, compactIndexFiles), + CommitChangesProvider.provider( + changes.compactTableFiles, + changes.compactChangelog, + changes.compactIndexFiles), committable.identifier(), committable.watermark(), committable.logOffsets(), committable.properties(), - provider(CommitKind.COMPACT), + CommitKindProvider.provider(CommitKind.COMPACT), hasConflictChecked(safeLatestSnapshotId), null); generatedSnapshot += 1; @@ -435,10 +436,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { commitDuration); if (this.commitMetrics != null) { reportCommit( - appendTableFiles, - appendChangelog, - compactTableFiles, - compactChangelog, + changes.appendTableFiles, + changes.appendChangelog, + changes.compactTableFiles, + changes.compactChangelog, commitDuration, generatedSnapshot, attempts); @@ -502,32 +503,19 @@ public int overwritePartition( long started = System.nanoTime(); int generatedSnapshot = 0; int attempts = 0; - List appendTableFiles = new ArrayList<>(); - List appendChangelog = new ArrayList<>(); - List appendIndexFiles = new ArrayList<>(); - List compactTableFiles = new ArrayList<>(); - List compactChangelog = new ArrayList<>(); - List compactIndexFiles = new ArrayList<>(); - collectChanges( - committable.fileCommittables(), - appendTableFiles, - appendChangelog, - appendIndexFiles, - compactTableFiles, - compactChangelog, - compactIndexFiles); - - if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) { + + ManifestEntryChanges changes = collectChanges(committable.fileCommittables()); + if (!changes.appendChangelog.isEmpty() || !changes.compactChangelog.isEmpty()) { StringBuilder warnMessage = new StringBuilder( "Overwrite mode currently does not commit any changelog.\n" + "Please make sure that the partition you're overwriting " + "is not being consumed by a streaming reader.\n" + "Ignored changelog files are:\n"); - for (ManifestEntry entry : appendChangelog) { + for (ManifestEntry entry : changes.appendChangelog) { warnMessage.append(" * ").append(entry.toString()).append("\n"); } - for (ManifestEntry entry : compactChangelog) { + for (ManifestEntry entry : changes.compactChangelog) { warnMessage.append(" * ").append(entry.toString()).append("\n"); } LOG.warn(warnMessage.toString()); @@ -538,12 +526,12 @@ public int overwritePartition( // partition filter is built from static or dynamic partition according to properties PartitionPredicate partitionFilter = null; if (dynamicPartitionOverwrite) { - if (appendTableFiles.isEmpty()) { + if (changes.appendTableFiles.isEmpty()) { // in dynamic mode, if there is no changes to commit, no data will be deleted skipOverwrite = true; } else { Set partitions = - appendTableFiles.stream() + changes.appendTableFiles.stream() .map(ManifestEntry::partition) .collect(Collectors.toSet()); partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions); @@ -556,7 +544,7 @@ public int overwritePartition( PartitionPredicate.fromPredicate(partitionType, partitionPredicate); // sanity check, all changes must be done within the given partition if (partitionFilter != null) { - for (ManifestEntry entry : appendTableFiles) { + for (ManifestEntry entry : changes.appendTableFiles) { if (!partitionFilter.test(entry.partition())) { throw new IllegalArgumentException( "Trying to overwrite partition " @@ -569,11 +557,12 @@ public int overwritePartition( } } - boolean withCompact = !compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty(); + boolean withCompact = + !changes.compactTableFiles.isEmpty() || !changes.compactIndexFiles.isEmpty(); if (!withCompact) { // try upgrade - appendTableFiles = tryUpgrade(appendTableFiles); + changes.appendTableFiles = tryUpgrade(changes.appendTableFiles); } // overwrite new files @@ -581,8 +570,8 @@ public int overwritePartition( attempts += tryOverwritePartition( partitionFilter, - appendTableFiles, - appendIndexFiles, + changes.appendTableFiles, + changes.appendIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -593,12 +582,15 @@ public int overwritePartition( if (withCompact) { attempts += tryCommit( - provider(compactTableFiles, emptyList(), compactIndexFiles), + CommitChangesProvider.provider( + changes.compactTableFiles, + emptyList(), + changes.compactIndexFiles), committable.identifier(), committable.watermark(), committable.logOffsets(), committable.properties(), - provider(CommitKind.COMPACT), + CommitKindProvider.provider(CommitKind.COMPACT), mustConflictCheck(), null); generatedSnapshot += 1; @@ -608,9 +600,9 @@ public int overwritePartition( LOG.info("Finished overwrite to table {}, duration {} ms", tableName, commitDuration); if (this.commitMetrics != null) { reportCommit( - appendTableFiles, + changes.appendTableFiles, emptyList(), - compactTableFiles, + changes.compactTableFiles, emptyList(), commitDuration, generatedSnapshot, @@ -745,12 +737,12 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) { public void commitStatistics(Statistics stats, long commitIdentifier) { String statsFileName = statsFileHandler.writeStats(stats); tryCommit( - provider(emptyList(), emptyList(), emptyList()), + CommitChangesProvider.provider(emptyList(), emptyList(), emptyList()), commitIdentifier, null, Collections.emptyMap(), Collections.emptyMap(), - provider(CommitKind.ANALYZE), + CommitKindProvider.provider(CommitKind.ANALYZE), noConflictCheck(), statsFileName); } @@ -765,128 +757,15 @@ public FileIO fileIO() { return fileIO; } - private void collectChanges( - List commitMessages, - List appendTableFiles, - List appendChangelog, - List appendIndexFiles, - List compactTableFiles, - List compactChangelog, - List compactIndexFiles) { - for (CommitMessage message : commitMessages) { - CommitMessageImpl commitMessage = (CommitMessageImpl) message; - commitMessage - .newFilesIncrement() - .newFiles() - .forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m))); - commitMessage - .newFilesIncrement() - .deletedFiles() - .forEach( - m -> - appendTableFiles.add( - makeEntry(FileKind.DELETE, commitMessage, m))); - commitMessage - .newFilesIncrement() - .changelogFiles() - .forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); - commitMessage - .newFilesIncrement() - .deletedIndexFiles() - .forEach( - m -> - appendIndexFiles.add( - new IndexManifestEntry( - FileKind.DELETE, - commitMessage.partition(), - commitMessage.bucket(), - m))); - commitMessage - .newFilesIncrement() - .newIndexFiles() - .forEach( - m -> - appendIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - m))); - - commitMessage - .compactIncrement() - .compactBefore() - .forEach( - m -> - compactTableFiles.add( - makeEntry(FileKind.DELETE, commitMessage, m))); - commitMessage - .compactIncrement() - .compactAfter() - .forEach(m -> compactTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m))); - commitMessage - .compactIncrement() - .changelogFiles() - .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); - commitMessage - .compactIncrement() - .deletedIndexFiles() - .forEach( - m -> - compactIndexFiles.add( - new IndexManifestEntry( - FileKind.DELETE, - commitMessage.partition(), - commitMessage.bucket(), - m))); - commitMessage - .compactIncrement() - .newIndexFiles() - .forEach( - m -> - compactIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - m))); - } - if (!commitMessages.isEmpty()) { - List msg = new ArrayList<>(); - if (!appendTableFiles.isEmpty()) { - msg.add(appendTableFiles.size() + " append table files"); - } - if (!appendChangelog.isEmpty()) { - msg.add(appendChangelog.size() + " append Changelogs"); - } - if (!appendIndexFiles.isEmpty()) { - msg.add(appendIndexFiles.size() + " append index files"); - } - if (!compactTableFiles.isEmpty()) { - msg.add(compactTableFiles.size() + " compact table files"); - } - if (!compactChangelog.isEmpty()) { - msg.add(compactChangelog.size() + " compact Changelogs"); - } - if (!compactIndexFiles.isEmpty()) { - msg.add(compactIndexFiles.size() + " compact index files"); - } - LOG.info("Finished collecting changes, including: {}", String.join(", ", msg)); - } - } - - private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, DataFileMeta file) { - Integer totalBuckets = commitMessage.totalBuckets(); - if (totalBuckets == null) { - totalBuckets = numBucket; - } - - return ManifestEntry.create( - kind, commitMessage.partition(), commitMessage.bucket(), totalBuckets, file); + private ManifestEntryChanges collectChanges(List commitMessages) { + ManifestEntryChanges changes = new ManifestEntryChanges(numBucket); + commitMessages.forEach(changes::collect); + LOG.info("Finished collecting changes, including: {}", changes); + return changes; } private int tryCommit( - ChangesProvider changesProvider, + CommitChangesProvider changesProvider, long identifier, @Nullable Long watermark, Map logOffsets, @@ -895,7 +774,7 @@ private int tryCommit( ConflictCheck conflictCheck, @Nullable String statsFileName) { int retryCount = 0; - RetryResult retryResult = null; + RetryCommitResult retryResult = null; long startMillis = System.currentTimeMillis(); while (true) { Snapshot latestSnapshot = snapshotManager.latestSnapshot(); @@ -920,7 +799,7 @@ private int tryCommit( break; } - retryResult = (RetryResult) result; + retryResult = (RetryCommitResult) result; if (System.currentTimeMillis() - startMillis > commitTimeout || retryCount >= commitMaxRetries) { @@ -959,7 +838,8 @@ private int tryOverwritePartition( : CommitKind.APPEND; return tryCommit( latestSnapshot -> - overwriteChanges(changes, indexFiles, latestSnapshot, partitionFilter), + scanner.readOverwriteChanges( + numBucket, changes, indexFiles, latestSnapshot, partitionFilter), identifier, watermark, logOffsets, @@ -969,51 +849,9 @@ private int tryOverwritePartition( null); } - private CommitChanges overwriteChanges( - List changes, - List indexFiles, - @Nullable Snapshot latestSnapshot, - @Nullable PartitionPredicate partitionFilter) { - List changesWithOverwrite = new ArrayList<>(); - List indexChangesWithOverwrite = new ArrayList<>(); - if (latestSnapshot != null) { - scan.withSnapshot(latestSnapshot) - .withPartitionFilter(partitionFilter) - .withKind(ScanMode.ALL); - if (numBucket != BucketMode.POSTPONE_BUCKET) { - // bucket = -2 can only be overwritten in postpone bucket tables - scan.withBucketFilter(bucket -> bucket >= 0); - } - List currentEntries = scan.plan().files(); - for (ManifestEntry entry : currentEntries) { - changesWithOverwrite.add( - ManifestEntry.create( - FileKind.DELETE, - entry.partition(), - entry.bucket(), - entry.totalBuckets(), - entry.file())); - } - - // collect index files - if (latestSnapshot.indexManifest() != null) { - List entries = - indexManifestFile.read(latestSnapshot.indexManifest()); - for (IndexManifestEntry entry : entries) { - if (partitionFilter == null || partitionFilter.test(entry.partition())) { - indexChangesWithOverwrite.add(entry.toDeleteEntry()); - } - } - } - } - changesWithOverwrite.addAll(changes); - indexChangesWithOverwrite.addAll(indexFiles); - return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite); - } - @VisibleForTesting CommitResult tryCommitOnce( - @Nullable RetryResult retryResult, + @Nullable RetryCommitResult retryResult, List deltaFiles, List changelogFiles, List indexFiles, @@ -1042,7 +880,7 @@ CommitResult tryCommitOnce( if (snapshot.commitUser().equals(commitUser) && snapshot.commitIdentifier() == identifier && snapshot.commitKind() == commitKind) { - return new SuccessResult(); + return new SuccessCommitResult(); } } } @@ -1085,7 +923,7 @@ CommitResult tryCommitOnce( if (retryResult != null && retryResult.latestSnapshot != null) { baseDataFiles = new ArrayList<>(retryResult.baseDataFiles); List incremental = - readIncrementalChanges( + scanner.readIncrementalChanges( retryResult.latestSnapshot, latestSnapshot, changedPartitions); if (!incremental.isEmpty()) { baseDataFiles.addAll(incremental); @@ -1093,7 +931,8 @@ CommitResult tryCommitOnce( } } else { baseDataFiles = - readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions); + scanner.readAllEntriesFromChangedPartitions( + latestSnapshot, changedPartitions); } if (discardDuplicate) { Set baseIdentifiers = @@ -1127,7 +966,7 @@ CommitResult tryCommitOnce( long previousTotalRecordCount = 0L; Long currentWatermark = watermark; if (latestSnapshot != null) { - previousTotalRecordCount = scan.totalRecordCount(latestSnapshot); + previousTotalRecordCount = scanner.totalRecordCount(latestSnapshot); // read all previous manifest files mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot); // read the last snapshot to complete the bucket's offsets when logOffsets does not @@ -1232,9 +1071,10 @@ CommitResult tryCommitOnce( nextRowIdStart); } catch (Throwable e) { // fails when preparing for commit, we should clean up - cleanUpReuseTmpManifests( + commitCleaner.cleanUpReuseTmpManifests( deltaManifestList, changelogManifestList, oldIndexManifest, indexManifest); - cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); + commitCleaner.cleanUpNoReuseTmpManifests( + baseManifestList, mergeBeforeManifests, mergeAfterManifests); throw new RuntimeException( String.format( "Exception occurs when preparing snapshot #%d by user %s " @@ -1249,7 +1089,7 @@ CommitResult tryCommitOnce( } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); - return new RetryResult(latestSnapshot, baseDataFiles, e); + return new RetryCommitResult(latestSnapshot, baseDataFiles, e); } if (!success) { @@ -1264,8 +1104,9 @@ CommitResult tryCommitOnce( identifier, commitKind.name(), commitTime); - cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests); - return new RetryResult(latestSnapshot, baseDataFiles, null); + commitCleaner.cleanUpNoReuseTmpManifests( + baseManifestList, mergeBeforeManifests, mergeAfterManifests); + return new RetryCommitResult(latestSnapshot, baseDataFiles, null); } LOG.info( @@ -1284,7 +1125,7 @@ CommitResult tryCommitOnce( commitCallbacks.forEach( callback -> callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot)); - return new SuccessResult(); + return new SuccessCommitResult(); } public boolean replaceManifestList( @@ -1317,7 +1158,7 @@ public boolean replaceManifestList( latest.properties(), latest.nextRowId()); - return commitSnapshotImpl(newSnapshot, Collections.emptyList()); + return commitSnapshotImpl(newSnapshot, emptyList()); } private long assignRowTrackingMeta( @@ -1463,104 +1304,13 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List de + "with identifier %s and kind %s. " + "Cannot clean up because we can't determine the success.", newSnapshot.id(), - commitUser, + newSnapshot.commitUser(), newSnapshot.commitIdentifier(), newSnapshot.commitKind().name()), e); } } - private List readIncrementalChanges( - Snapshot from, Snapshot to, List changedPartitions) { - List entries = new ArrayList<>(); - for (long i = from.id() + 1; i <= to.id(); i++) { - List delta = - scan.withSnapshot(i) - .withKind(ScanMode.DELTA) - .withPartitionFilter(changedPartitions) - .readSimpleEntries(); - entries.addAll(delta); - } - return entries; - } - - private List changedPartitions( - List appendTableFiles, - List compactTableFiles, - List appendIndexFiles) { - Set changedPartitions = new HashSet<>(); - for (ManifestEntry appendTableFile : appendTableFiles) { - changedPartitions.add(appendTableFile.partition()); - } - for (ManifestEntry compactTableFile : compactTableFiles) { - changedPartitions.add(compactTableFile.partition()); - } - for (IndexManifestEntry appendIndexFile : appendIndexFiles) { - if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) { - changedPartitions.add(appendIndexFile.partition()); - } - } - return new ArrayList<>(changedPartitions); - } - - private List readAllEntriesFromChangedPartitions( - Snapshot snapshot, List changedPartitions) { - try { - return scan.withSnapshot(snapshot) - .withKind(ScanMode.ALL) - .withPartitionFilter(changedPartitions) - .readSimpleEntries(); - } catch (Throwable e) { - throw new RuntimeException("Cannot read manifest entries from changed partitions.", e); - } - } - - private void cleanUpNoReuseTmpManifests( - Pair baseManifestList, - List mergeBeforeManifests, - List mergeAfterManifests) { - if (baseManifestList != null) { - manifestList.delete(baseManifestList.getKey()); - } - Set oldMetaSet = - mergeBeforeManifests.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toSet()); - for (ManifestFileMeta suspect : mergeAfterManifests) { - if (!oldMetaSet.contains(suspect.fileName())) { - manifestFile.delete(suspect.fileName()); - } - } - } - - private void cleanUpReuseTmpManifests( - Pair deltaManifestList, - Pair changelogManifestList, - String oldIndexManifest, - String newIndexManifest) { - if (deltaManifestList != null) { - for (ManifestFileMeta manifest : manifestList.read(deltaManifestList.getKey())) { - manifestFile.delete(manifest.fileName()); - } - manifestList.delete(deltaManifestList.getKey()); - } - - if (changelogManifestList != null) { - for (ManifestFileMeta manifest : manifestList.read(changelogManifestList.getKey())) { - manifestFile.delete(manifest.fileName()); - } - manifestList.delete(changelogManifestList.getKey()); - } - - cleanIndexManifest(oldIndexManifest, newIndexManifest); - } - - private void cleanIndexManifest(String oldIndexManifest, String newIndexManifest) { - if (newIndexManifest != null && !Objects.equals(oldIndexManifest, newIndexManifest)) { - indexManifestFile.delete(newIndexManifest); - } - } - private void commitRetryWait(int retryCount) { int retryWait = (int) Math.min(commitMinRetryWait * Math.pow(2, retryCount), commitMaxRetryWait); @@ -1576,78 +1326,7 @@ private void commitRetryWait(int retryCount) { @Override public void close() { - for (CommitCallback callback : commitCallbacks) { - IOUtils.closeQuietly(callback); - } + IOUtils.closeAllQuietly(commitCallbacks); IOUtils.closeQuietly(snapshotCommit); } - - private interface CommitResult { - boolean isSuccess(); - } - - private static class SuccessResult implements CommitResult { - - @Override - public boolean isSuccess() { - return true; - } - } - - @VisibleForTesting - static class RetryResult implements CommitResult { - - private final Snapshot latestSnapshot; - private final List baseDataFiles; - private final Exception exception; - - public RetryResult( - Snapshot latestSnapshot, List baseDataFiles, Exception exception) { - this.latestSnapshot = latestSnapshot; - this.baseDataFiles = baseDataFiles; - this.exception = exception; - } - - @Override - public boolean isSuccess() { - return false; - } - } - - private static ChangesProvider provider( - List tableFiles, - List changelogFiles, - List indexFiles) { - return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles); - } - - @FunctionalInterface - private interface ChangesProvider { - CommitChanges provide(@Nullable Snapshot latestSnapshot); - } - - private static class CommitChanges { - - private final List tableFiles; - private final List changelogFiles; - private final List indexFiles; - - private CommitChanges( - List tableFiles, - List changelogFiles, - List indexFiles) { - this.tableFiles = tableFiles; - this.changelogFiles = changelogFiles; - this.indexFiles = indexFiles; - } - } - - @FunctionalInterface - private interface CommitKindProvider { - CommitKind provide(CommitChanges changes); - } - - private static CommitKindProvider provider(CommitKind kind) { - return changes -> kind; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChanges.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChanges.java new file mode 100644 index 000000000000..2144040d178e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChanges.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; + +import java.util.List; + +/** Commit changes. */ +public class CommitChanges { + + public final List tableFiles; + public final List changelogFiles; + public final List indexFiles; + + public CommitChanges( + List tableFiles, + List changelogFiles, + List indexFiles) { + this.tableFiles = tableFiles; + this.changelogFiles = changelogFiles; + this.indexFiles = indexFiles; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChangesProvider.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChangesProvider.java new file mode 100644 index 000000000000..682198d4be59 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitChangesProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; + +import javax.annotation.Nullable; + +import java.util.List; + +/** Provider to provide {@link CommitChanges}. */ +@FunctionalInterface +public interface CommitChangesProvider { + + CommitChanges provide(@Nullable Snapshot latestSnapshot); + + static CommitChangesProvider provider( + List tableFiles, + List changelogFiles, + List indexFiles) { + return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitCleaner.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitCleaner.java new file mode 100644 index 000000000000..a24b5c4c6e9b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitCleaner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.manifest.IndexManifestFile; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.utils.Pair; + +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** A cleaner to clean commit tmp files. */ +public class CommitCleaner { + + private final ManifestList manifestList; + private final ManifestFile manifestFile; + private final IndexManifestFile indexManifestFile; + + public CommitCleaner( + ManifestList manifestList, + ManifestFile manifestFile, + IndexManifestFile indexManifestFile) { + this.manifestList = manifestList; + this.manifestFile = manifestFile; + this.indexManifestFile = indexManifestFile; + } + + public void cleanUpReuseTmpManifests( + Pair deltaManifestList, + Pair changelogManifestList, + String oldIndexManifest, + String newIndexManifest) { + if (deltaManifestList != null) { + for (ManifestFileMeta manifest : manifestList.read(deltaManifestList.getKey())) { + manifestFile.delete(manifest.fileName()); + } + manifestList.delete(deltaManifestList.getKey()); + } + + if (changelogManifestList != null) { + for (ManifestFileMeta manifest : manifestList.read(changelogManifestList.getKey())) { + manifestFile.delete(manifest.fileName()); + } + manifestList.delete(changelogManifestList.getKey()); + } + + cleanIndexManifest(oldIndexManifest, newIndexManifest); + } + + public void cleanUpNoReuseTmpManifests( + Pair baseManifestList, + List mergeBeforeManifests, + List mergeAfterManifests) { + if (baseManifestList != null) { + manifestList.delete(baseManifestList.getKey()); + } + Set oldMetaSet = + mergeBeforeManifests.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toSet()); + for (ManifestFileMeta suspect : mergeAfterManifests) { + if (!oldMetaSet.contains(suspect.fileName())) { + manifestFile.delete(suspect.fileName()); + } + } + } + + private void cleanIndexManifest(String oldIndexManifest, String newIndexManifest) { + if (newIndexManifest != null && !Objects.equals(oldIndexManifest, newIndexManifest)) { + indexManifestFile.delete(newIndexManifest); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitKindProvider.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitKindProvider.java new file mode 100644 index 000000000000..053593ef415e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitKindProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot.CommitKind; + +/** Provider to provide {@link CommitKind}. */ +@FunctionalInterface +public interface CommitKindProvider { + + CommitKind provide(CommitChanges changes); + + static CommitKindProvider provider(CommitKind kind) { + return changes -> kind; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitResult.java new file mode 100644 index 000000000000..3c0136bf05df --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitResult.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +/** Result of a commit. */ +public interface CommitResult { + boolean isSuccess(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java new file mode 100644 index 000000000000..0a93d2b92023 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.IndexManifestFile; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.source.ScanMode; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyList; + +/** Manifest entries scanner for commit. */ +public class CommitScanner { + + private final FileStoreScan scan; + private final IndexManifestFile indexManifestFile; + + public CommitScanner( + FileStoreScan scan, IndexManifestFile indexManifestFile, CoreOptions options) { + this.scan = scan; + this.indexManifestFile = indexManifestFile; + // Stats in DELETE Manifest Entries is useless + if (options.manifestDeleteFileDropStats()) { + this.scan.dropStats(); + } + } + + public List readIncrementalChanges( + Snapshot from, Snapshot to, List changedPartitions) { + List entries = new ArrayList<>(); + for (long i = from.id() + 1; i <= to.id(); i++) { + List delta = + scan.withSnapshot(i) + .withKind(ScanMode.DELTA) + .withPartitionFilter(changedPartitions) + .readSimpleEntries(); + entries.addAll(delta); + } + return entries; + } + + public List readAllEntriesFromChangedPartitions( + Snapshot snapshot, List changedPartitions) { + try { + return scan.withSnapshot(snapshot) + .withKind(ScanMode.ALL) + .withPartitionFilter(changedPartitions) + .readSimpleEntries(); + } catch (Throwable e) { + throw new RuntimeException("Cannot read manifest entries from changed partitions.", e); + } + } + + public CommitChanges readOverwriteChanges( + int numBucket, + List changes, + List indexFiles, + @Nullable Snapshot latestSnapshot, + @Nullable PartitionPredicate partitionFilter) { + List changesWithOverwrite = new ArrayList<>(); + List indexChangesWithOverwrite = new ArrayList<>(); + if (latestSnapshot != null) { + scan.withSnapshot(latestSnapshot) + .withPartitionFilter(partitionFilter) + .withKind(ScanMode.ALL); + if (numBucket != BucketMode.POSTPONE_BUCKET) { + // bucket = -2 can only be overwritten in postpone bucket tables + scan.withBucketFilter(bucket -> bucket >= 0); + } + List currentEntries = scan.plan().files(); + for (ManifestEntry entry : currentEntries) { + changesWithOverwrite.add( + ManifestEntry.create( + FileKind.DELETE, + entry.partition(), + entry.bucket(), + entry.totalBuckets(), + entry.file())); + } + + // collect index files + if (latestSnapshot.indexManifest() != null) { + List entries = + indexManifestFile.read(latestSnapshot.indexManifest()); + for (IndexManifestEntry entry : entries) { + if (partitionFilter == null || partitionFilter.test(entry.partition())) { + indexChangesWithOverwrite.add(entry.toDeleteEntry()); + } + } + } + } + changesWithOverwrite.addAll(changes); + indexChangesWithOverwrite.addAll(indexFiles); + return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite); + } + + public long totalRecordCount(Snapshot snapshot) { + return scan.totalRecordCount(snapshot); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java new file mode 100644 index 000000000000..794445dfbbd7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; + +/** Detailed changes from {@link CommitMessage}s. */ +public class ManifestEntryChanges { + + private final int defaultNumBucket; + + public List appendTableFiles; + public List appendChangelog; + public List appendIndexFiles; + public List compactTableFiles; + public List compactChangelog; + public List compactIndexFiles; + + public ManifestEntryChanges(int defaultNumBucket) { + this.defaultNumBucket = defaultNumBucket; + this.appendTableFiles = new ArrayList<>(); + this.appendChangelog = new ArrayList<>(); + this.appendIndexFiles = new ArrayList<>(); + this.compactTableFiles = new ArrayList<>(); + this.compactChangelog = new ArrayList<>(); + this.compactIndexFiles = new ArrayList<>(); + } + + public void collect(CommitMessage message) { + CommitMessageImpl commitMessage = (CommitMessageImpl) message; + commitMessage + .newFilesIncrement() + .newFiles() + .forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .newFilesIncrement() + .deletedFiles() + .forEach(m -> appendTableFiles.add(makeEntry(FileKind.DELETE, commitMessage, m))); + commitMessage + .newFilesIncrement() + .changelogFiles() + .forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .newFilesIncrement() + .deletedIndexFiles() + .forEach( + m -> + appendIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + m))); + commitMessage + .newFilesIncrement() + .newIndexFiles() + .forEach( + m -> + appendIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + m))); + + commitMessage + .compactIncrement() + .compactBefore() + .forEach(m -> compactTableFiles.add(makeEntry(FileKind.DELETE, commitMessage, m))); + commitMessage + .compactIncrement() + .compactAfter() + .forEach(m -> compactTableFiles.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .compactIncrement() + .changelogFiles() + .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .compactIncrement() + .deletedIndexFiles() + .forEach( + m -> + compactIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + m))); + commitMessage + .compactIncrement() + .newIndexFiles() + .forEach( + m -> + compactIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + m))); + } + + private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, DataFileMeta file) { + Integer totalBuckets = commitMessage.totalBuckets(); + if (totalBuckets == null) { + totalBuckets = defaultNumBucket; + } + + return ManifestEntry.create( + kind, commitMessage.partition(), commitMessage.bucket(), totalBuckets, file); + } + + @Override + public String toString() { + List msg = new ArrayList<>(); + if (!appendTableFiles.isEmpty()) { + msg.add(appendTableFiles.size() + " append table files"); + } + if (!appendChangelog.isEmpty()) { + msg.add(appendChangelog.size() + " append Changelogs"); + } + if (!appendIndexFiles.isEmpty()) { + msg.add(appendIndexFiles.size() + " append index files"); + } + if (!compactTableFiles.isEmpty()) { + msg.add(compactTableFiles.size() + " compact table files"); + } + if (!compactChangelog.isEmpty()) { + msg.add(compactChangelog.size() + " compact Changelogs"); + } + if (!compactIndexFiles.isEmpty()) { + msg.add(compactIndexFiles.size() + " compact index files"); + } + return String.join(", ", msg); + } + + public static List changedPartitions( + List appendTableFiles, + List compactTableFiles, + List appendIndexFiles) { + Set changedPartitions = new HashSet<>(); + for (ManifestEntry appendTableFile : appendTableFiles) { + changedPartitions.add(appendTableFile.partition()); + } + for (ManifestEntry compactTableFile : compactTableFiles) { + changedPartitions.add(compactTableFile.partition()); + } + for (IndexManifestEntry appendIndexFile : appendIndexFiles) { + if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) { + changedPartitions.add(appendIndexFile.partition()); + } + } + return new ArrayList<>(changedPartitions); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java new file mode 100644 index 000000000000..e64049ea63c2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.SimpleFileEntry; + +import java.util.List; + +/** Need to retry commit of {@link CommitResult}. */ +public class RetryCommitResult implements CommitResult { + + public final Snapshot latestSnapshot; + public final List baseDataFiles; + public final Exception exception; + + public RetryCommitResult( + Snapshot latestSnapshot, List baseDataFiles, Exception exception) { + this.latestSnapshot = latestSnapshot; + this.baseDataFiles = baseDataFiles; + this.exception = exception; + } + + @Override + public boolean isSuccess() { + return false; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/SuccessCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/SuccessCommitResult.java new file mode 100644 index 000000000000..15534365ff69 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/SuccessCommitResult.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +/** Success {@link CommitResult}. */ +public class SuccessCommitResult implements CommitResult { + + @Override + public boolean isSuccess() { + return true; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 2c1f6c50eaab..ec17d7e4c671 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -38,7 +38,7 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; -import org.apache.paimon.operation.FileStoreCommitImpl.RetryResult; +import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -1073,7 +1073,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null); // Compact commit.tryCommitOnce( - new RetryResult(firstLatest, Collections.emptyList(), null), + new RetryCommitResult(firstLatest, Collections.emptyList(), null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),