From 5d5228db519f0cc615c4955ba36b9f3ee0572788 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Wed, 18 Jan 2023 15:54:28 +0530 Subject: [PATCH 1/9] HADOOP-18596. Distcp -update to use modification time while checking for file skip. --- .../hadoop/tools/mapred/CopyMapper.java | 9 +- .../contract/AbstractContractDistCpTest.java | 88 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index c1a11ef091ba2..34395b818737e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -354,7 +354,14 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, boolean sameLength = target.getLen() == source.getLen(); boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() || !preserve.contains(FileAttribute.BLOCKSIZE); - if (sameLength && sameBlockSize) { + // checksum check to be done if same file len(greater than 0), same block + // size and the target file has been updated more recently than the source + // file. + // Note: For Different cloud stores with different checksum algorithms, + // checksum comparisons are not performed so we would be depending on the + // file size and modification time. + if (sameLength && (source.getLen() > 0) && sameBlockSize && + source.getModificationTime() < target.getModificationTime()) { return skipCrc || DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, targetFS, target.getPath(), source.getLen()); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 8545df30bac8e..e668cea9851d6 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -24,11 +24,17 @@ import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID; import java.io.IOException; +import java.nio.file.Files; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -72,6 +78,9 @@ public abstract class AbstractContractDistCpTest private static final Logger LOG = LoggerFactory.getLogger(AbstractContractDistCpTest.class); + /** Using offset to change modification time in tests. */ + private static final long MODIFICATION_TIME_OFFSET = 10000; + public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB = "scale.test.distcp.file.size.kb"; @@ -857,4 +866,83 @@ public void testDistCpWithUpdateExistFile() throws Exception { verifyFileContents(localFS, dest, block); } + @Test + public void testDistCpUpdateCheckFileSkip() throws Exception { + describe("Distcp update to check file skips."); + + Path source = new Path(remoteDir, "file"); + Path dest = new Path(localDir, "file"); + dest = localFS.makeQualified(dest); + + // Creating a source file with certain dataset. + byte[] sourceBlock = dataset(10, 'a', 'z'); + + // Write the dataset and as well create the target path. + try (FSDataOutputStream out = remoteFS.create(source)) { + out.write(sourceBlock); + localFS.create(dest); + } + + verifyPathExists(remoteFS, "", source); + verifyPathExists(localFS, "", dest); + DistCpTestUtils + .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), + localDir.toString(), "-delete -update" + getDefaultCLIOptions(), + conf); + + // First distcp -update would normally copy the source to dest. + verifyFileContents(localFS, dest, sourceBlock); + + // Remove the source file and replace with a file with same name and size + // but different content. + remoteFS.delete(source, false); + Path updatedSource = new Path(remoteDir, "file"); + byte[] updatedSourceBlock = dataset(10, 'b', 'z'); + try (FSDataOutputStream out = remoteFS.create(updatedSource)) { + out.write(updatedSourceBlock); + } + + // For testing purposes we would take the modification time of the + // updated Source file and add an offset or subtract the offset and set + // that time as the modification time for target file, this way we can + // ensure that our test can emulate a scenario where source is either more + // recently changed after -update so that copy takes place or target file + // is more recently changed which would skip the copying since the source + // has not been recently updated. + FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource); + long modTimeSourceUpd = fsSourceUpd.getModificationTime(); + + // Add by an offset which would ensure enough gap for the test to + // not fail due to race conditions. + long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET; + localFS.setTimes(dest, newTargetModTimeNew, -1); + + DistCpTestUtils + .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), + localDir.toString(), "-delete -update" + getDefaultCLIOptions(), + conf); + + // File contents should remain same since the mod time for target is + // newer than the updatedSource which indicates that the sync happened + // more recently and there is no update. + verifyFileContents(localFS, dest, sourceBlock); + + // Subtract by an offset which would ensure enough gap for the test to + // not fail due to race conditions. + long newTargetModTimeOld = + Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0); + localFS.setTimes(dest, newTargetModTimeOld, -1); + + DistCpTestUtils + .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), + localDir.toString(), "-delete -update" + getDefaultCLIOptions(), + conf); + + Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) + .hasSize(1); + // Now the copy should take place and the file contents should change + // since the mod time for target is older than the source file indicating + // that there was an update to the source after the last sync took place. + verifyFileContents(localFS, dest, updatedSourceBlock); + } } \ No newline at end of file From ee9a8568ae9e97cb94a05bbe1b2191811e0d45ee Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Mon, 23 Jan 2023 13:42:57 +0530 Subject: [PATCH 2/9] HADOOP-18596. making mod time usage configurable --- .../apache/hadoop/tools/DistCpConstants.java | 7 ++++ .../hadoop/tools/mapred/CopyMapper.java | 39 +++++++++++++------ .../contract/AbstractContractDistCpTest.java | 4 -- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 289d552b86219..1908cba7b54f9 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -142,6 +142,13 @@ private DistCpConstants() { "distcp.blocks.per.chunk"; public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator"; + + /** Distcp -update to use modification time of source and target file to + * check while skipping. + */ + public static final String CONF_LABEL_UPDATE_MOD_TIME = + "distcp.update.modification.time"; + /** * Constants for DistCp return code to shell / consumer of ToolRunner's run */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 34395b818737e..42a9783972a82 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -85,6 +85,7 @@ static enum FileAction { private boolean append = false; private boolean verboseLog = false; private boolean directWrite = false; + private boolean useModTimeToUpdate = true; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -114,6 +115,8 @@ public void setup(Context context) throws IOException, InterruptedException { PRESERVE_STATUS.getConfigLabel())); directWrite = conf.getBoolean( DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); + useModTimeToUpdate = + conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, true); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -354,17 +357,31 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, boolean sameLength = target.getLen() == source.getLen(); boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() || !preserve.contains(FileAttribute.BLOCKSIZE); - // checksum check to be done if same file len(greater than 0), same block - // size and the target file has been updated more recently than the source - // file. - // Note: For Different cloud stores with different checksum algorithms, - // checksum comparisons are not performed so we would be depending on the - // file size and modification time. - if (sameLength && (source.getLen() > 0) && sameBlockSize && - source.getModificationTime() < target.getModificationTime()) { - return skipCrc || - DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, - targetFS, target.getPath(), source.getLen()); + if (source.getLen() == 0) { + return false; + } + // if both the source and target have the same length, then check if the + // config to use modification time is set to true, then use the + // modification time and checksum comparison to determine if the copy can + // be skipped else if not set then just use the checksum comparison to + // check copy skip. + // + // Note: Different object stores can have different checksum algorithms + // resulting in no checksum comparison that results in return true + // always, having the modification time enabled can help in these + // scenarios to not incorrectly skip a copy. Refer: HADOOP-18596. + if (sameLength && sameBlockSize) { + if (useModTimeToUpdate) { + return + (source.getModificationTime() < target.getModificationTime()) && + (skipCrc || DistCpUtils.checksumsAreEqual(sourceFS, + source.getPath(), null, + targetFS, target.getPath(), source.getLen())); + } else { + return skipCrc || DistCpUtils + .checksumsAreEqual(sourceFS, source.getPath(), null, + targetFS, target.getPath(), source.getLen()); + } } else { return false; } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index e668cea9851d6..c412683ddf3b2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -24,10 +24,6 @@ import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID; import java.io.IOException; -import java.nio.file.Files; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; import java.util.Collections; import java.util.HashMap; import java.util.Map; From d23f13be729f0fe0770ce856b4f676f6782d83b9 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Mon, 30 Jan 2023 14:22:37 +0530 Subject: [PATCH 3/9] HADOOP-18596. javadocs and correct return --- .../main/java/org/apache/hadoop/tools/mapred/CopyMapper.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 42a9783972a82..38cf17097f498 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -357,8 +357,9 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, boolean sameLength = target.getLen() == source.getLen(); boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() || !preserve.contains(FileAttribute.BLOCKSIZE); - if (source.getLen() == 0) { - return false; + // Skip the copy if a 0 size file is being copied. + if (sameLength && source.getLen() == 0) { + return true; } // if both the source and target have the same length, then check if the // config to use modification time is set to true, then use the From f094248e19b615310d4791e31d293b88ed6bde37 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Wed, 1 Feb 2023 15:23:51 +0530 Subject: [PATCH 4/9] HADOOP-18596. review comments --- .../apache/hadoop/tools/DistCpConstants.java | 9 +- .../contract/AbstractContractDistCpTest.java | 109 ++++++++++++++---- 2 files changed, 93 insertions(+), 25 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 1908cba7b54f9..9b73550a9aff7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -143,8 +143,13 @@ private DistCpConstants() { public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator"; - /** Distcp -update to use modification time of source and target file to - * check while skipping. + /** + * Enabling distcp -update to use modification time of source and target + * file to check while copying same file with same size but different content. + * + * The check would verify if the target file is perceived as older than the + * source then it indicates that the source has been recently updated and it + * is a newer version than what was synced, so we should not skip the copy. */ public static final String CONF_LABEL_UPDATE_MOD_TIME = "distcp.update.modification.time"; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index c412683ddf3b2..f8317fbc38121 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -50,6 +50,7 @@ import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.util.DistCpTestUtils; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.http.annotation.Contract; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -359,6 +360,29 @@ private Job distCpUpdate(final Path srcDir, final Path destDir) .withOverwrite(false))); } + /** + * Run distcp -update srcDir destDir. + * @param srcDir local source directory + * @param destDir remote destination directory. + * @return the completed job + * @throws Exception any failure. + */ + private Job distCpUpdateWithFs(final Path srcDir, final Path destDir, + FileSystem sourceFs, FileSystem targetFs) + throws Exception { + describe("\nDistcp -update from " + srcDir + " to " + destDir); + lsR("Source Fs to update", sourceFs, srcDir); + lsR("Target Fs before update", targetFs, destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDeleteMissing(true) + .withSyncFolder(true) + .withSkipCRC(true) + .withDirectWrite(shouldUseDirectWrite()) + .withOverwrite(false))); + } + /** * Update the source directories as various tests expect, * including adding a new file. @@ -868,35 +892,41 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { Path source = new Path(remoteDir, "file"); Path dest = new Path(localDir, "file"); + + Path source_0byte = new Path(remoteDir, "file_0byte"); + Path dest_0byte = new Path(localDir, "file_0byte"); dest = localFS.makeQualified(dest); + dest_0byte = localFS.makeQualified(dest_0byte); // Creating a source file with certain dataset. byte[] sourceBlock = dataset(10, 'a', 'z'); // Write the dataset and as well create the target path. - try (FSDataOutputStream out = remoteFS.create(source)) { - out.write(sourceBlock); - localFS.create(dest); - } + ContractTestUtils.createFile(localFS, dest, true, sourceBlock); + ContractTestUtils + .writeDataset(remoteFS, source, sourceBlock, sourceBlock.length, + 1024, true); - verifyPathExists(remoteFS, "", source); - verifyPathExists(localFS, "", dest); - DistCpTestUtils - .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), - localDir.toString(), "-delete -update" + getDefaultCLIOptions(), - conf); + // Create 0 byte source and target files. + ContractTestUtils.createFile(remoteFS, source_0byte, true, new byte[0]); + ContractTestUtils.createFile(localFS, dest_0byte, true, new byte[0]); + + // Execute the distcp -update job. + Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS); // First distcp -update would normally copy the source to dest. verifyFileContents(localFS, dest, sourceBlock); + // Verify 1 file was skipped in the distcp -update(They 0 byte files). + // Verify 1 file was copied in the distcp -update(The new source file). + verifySkipAndCopyCounter(job, 1, 1); // Remove the source file and replace with a file with same name and size // but different content. remoteFS.delete(source, false); Path updatedSource = new Path(remoteDir, "file"); byte[] updatedSourceBlock = dataset(10, 'b', 'z'); - try (FSDataOutputStream out = remoteFS.create(updatedSource)) { - out.write(updatedSourceBlock); - } + ContractTestUtils.writeDataset(remoteFS, updatedSource, + updatedSourceBlock, updatedSourceBlock.length, 1024, true); // For testing purposes we would take the modification time of the // updated Source file and add an offset or subtract the offset and set @@ -913,15 +943,19 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET; localFS.setTimes(dest, newTargetModTimeNew, -1); - DistCpTestUtils - .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), - localDir.toString(), "-delete -update" + getDefaultCLIOptions(), - conf); + // Execute the distcp -update job. + Job updatedSourceJobOldSrc = + distCpUpdateWithFs(remoteDir, localDir, remoteFS, + localFS); // File contents should remain same since the mod time for target is // newer than the updatedSource which indicates that the sync happened // more recently and there is no update. verifyFileContents(localFS, dest, sourceBlock); + // Skipped both 0 byte file and sourceFile(since mod time of target is + // older than the source it is perceived that source is of older version + // and we can skip it's copy). + verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0); // Subtract by an offset which would ensure enough gap for the test to // not fail due to race conditions. @@ -929,16 +963,45 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0); localFS.setTimes(dest, newTargetModTimeOld, -1); - DistCpTestUtils - .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(), - localDir.toString(), "-delete -update" + getDefaultCLIOptions(), - conf); + // Execute the distcp -update job. + Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir, + remoteFS, + localFS); - Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) - .hasSize(1); + // Verifying the target directory have both 0 byte file and the content + // file. + Assertions + .assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true))) + .hasSize(2); // Now the copy should take place and the file contents should change // since the mod time for target is older than the source file indicating // that there was an update to the source after the last sync took place. verifyFileContents(localFS, dest, updatedSourceBlock); + // Verifying we skipped the 0 byte file and copied the updated source + // file (since the modification time of the new source is older than the + // target now). + verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1); + } + + /** + * Method to check the skipped and copied counters of a distcp job. + * + * @param job job to check. + * @param skipExpectedValue expected skip counter value. + * @param copyExpectedValue expected copy counter value. + * @throws IOException throw in case of failures. + */ + private void verifySkipAndCopyCounter(Job job, + int skipExpectedValue, int copyExpectedValue) throws IOException { + // get the skip and copy counters from the job. + long skipActualValue = job.getCounters() + .findCounter(CopyMapper.Counter.SKIP).getValue(); + long copyActualValue = job.getCounters() + .findCounter(CopyMapper.Counter.COPY).getValue(); + // Verify if the actual values equals the expected ones. + assertEquals("Mismatch in COPY counter value", copyExpectedValue, + copyActualValue); + assertEquals("Mismatch in SKIP counter value", skipExpectedValue, + skipActualValue); } } \ No newline at end of file From 8c427bd8a37f76e61bf6c8d0d0e7cb364d3a9344 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Thu, 2 Feb 2023 17:56:43 +0530 Subject: [PATCH 5/9] HADOOP-18596. docs and checkstyle --- .../apache/hadoop/tools/DistCpConstants.java | 1 + .../src/site/markdown/DistCp.md.vm | 41 +++++++++++++++---- .../contract/AbstractContractDistCpTest.java | 18 ++++---- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 9b73550a9aff7..84541ac3a60ea 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -150,6 +150,7 @@ private DistCpConstants() { * The check would verify if the target file is perceived as older than the * source then it indicates that the source has been recently updated and it * is a newer version than what was synced, so we should not skip the copy. + * {@value} */ public static final String CONF_LABEL_UPDATE_MOD_TIME = "distcp.update.modification.time"; diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index a86e41c6668fd..77915965bbd46 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -631,14 +631,39 @@ hadoop distcp -update -numListstatusThreads 20 \ Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation on a large directory tree (the limit is 40 threads). -When `DistCp -update` is used with object stores, -generally only the modification time and length of the individual files are compared, -not any checksums. The fact that most object stores do have valid timestamps -for directories is irrelevant; only the file timestamps are compared. -However, it is important to have the clock of the client computers close -to that of the infrastructure, so that timestamps are consistent between -the client/HDFS cluster and that of the object store. Otherwise, changed files may be -missed/copied too often. +When `DistCp -update` is used with object stores, generally only the +modification time and length of the individual files are compared, not any +checksums if the checksum algorithm between the two stores is different. + +* The `distcp -update` between two object stores with different checksum + algorithm compares the modification times of source and target files along + with the file size to determine whether to skip the file copy. The behavior + is controlled by the property `distcp.update.modification.time`, which is + set to true by default. If the source file is more recently modified than + the target file, it is assumed that the content has changed, and the file + should be updated. + We need to ensure that there is no clock skew between the machines. + The fact that most object stores do have valid timestamps for directories + is irrelevant; only the file timestamps are compared. However, it is + important to have the clock of the client computers close to that of the + infrastructure, so that timestamps are consistent between the client/HDFS + cluster and that of the object store. Otherwise, changed files may be + missed/copied too often. + +* `distcp.update.modification.time` can be used alongside the checksum check + in stores with same checksum algorithm as well. if set to true we check + both modification time and checksum between the files, but if this property + is set to false we only compare the checksum between the files to determine + if we should skip the copy or not. + + To turn off, set this in your core-site.xml + +```xml + + distcp.update.modification.time + true + +``` **Notes** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index f8317fbc38121..05ee831e3dae2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -29,7 +29,6 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -50,7 +49,6 @@ import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.util.DistCpTestUtils; import org.apache.hadoop.util.functional.RemoteIterators; -import org.apache.http.annotation.Contract; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -893,10 +891,10 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { Path source = new Path(remoteDir, "file"); Path dest = new Path(localDir, "file"); - Path source_0byte = new Path(remoteDir, "file_0byte"); - Path dest_0byte = new Path(localDir, "file_0byte"); + Path source0byte = new Path(remoteDir, "file_0byte"); + Path dest0byte = new Path(localDir, "file_0byte"); dest = localFS.makeQualified(dest); - dest_0byte = localFS.makeQualified(dest_0byte); + dest0byte = localFS.makeQualified(dest0byte); // Creating a source file with certain dataset. byte[] sourceBlock = dataset(10, 'a', 'z'); @@ -908,16 +906,16 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { 1024, true); // Create 0 byte source and target files. - ContractTestUtils.createFile(remoteFS, source_0byte, true, new byte[0]); - ContractTestUtils.createFile(localFS, dest_0byte, true, new byte[0]); + ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]); + ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]); // Execute the distcp -update job. Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS); // First distcp -update would normally copy the source to dest. verifyFileContents(localFS, dest, sourceBlock); - // Verify 1 file was skipped in the distcp -update(They 0 byte files). - // Verify 1 file was copied in the distcp -update(The new source file). + // Verify 1 file was skipped in the distcp -update (The 0 byte file). + // Verify 1 file was copied in the distcp -update (The new source file). verifySkipAndCopyCounter(job, 1, 1); // Remove the source file and replace with a file with same name and size @@ -952,7 +950,7 @@ public void testDistCpUpdateCheckFileSkip() throws Exception { // newer than the updatedSource which indicates that the sync happened // more recently and there is no update. verifyFileContents(localFS, dest, sourceBlock); - // Skipped both 0 byte file and sourceFile(since mod time of target is + // Skipped both 0 byte file and sourceFile (since mod time of target is // older than the source it is perceived that source is of older version // and we can skip it's copy). verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0); From 4ff7f36138039a8ed90a0fd20af0d7b32f5a752e Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Wed, 8 Feb 2023 17:42:13 +0530 Subject: [PATCH 6/9] HADOOP-18596. enum return type + nits --- .../apache/hadoop/tools/DistCpConstants.java | 11 ++- .../hadoop/tools/mapred/CopyMapper.java | 74 +++++++++++++++---- .../apache/hadoop/tools/util/DistCpUtils.java | 31 ++++++-- .../src/site/markdown/DistCp.md.vm | 14 ++-- .../contract/AbstractContractDistCpTest.java | 2 +- .../tools/mapred/TestCopyCommitter.java | 6 +- 6 files changed, 101 insertions(+), 37 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 84541ac3a60ea..6838d4f775753 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -144,8 +144,9 @@ private DistCpConstants() { public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator"; /** - * Enabling distcp -update to use modification time of source and target - * file to check while copying same file with same size but different content. + * Enabling {@code distcp -update} to use modification time of source and + * target file to check while copying same file with same size but + * different content. * * The check would verify if the target file is perceived as older than the * source then it indicates that the source has been recently updated and it @@ -155,6 +156,12 @@ private DistCpConstants() { public static final String CONF_LABEL_UPDATE_MOD_TIME = "distcp.update.modification.time"; + /** + * Default value for 'distcp.update.modification.time' configuration. + */ + public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT = + true; + /** * Constants for DistCp return code to shell / consumer of ToolRunner's run */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 38cf17097f498..3c2c59e31660c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -41,6 +41,8 @@ import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT; + /** * Mapper class that executes the DistCp copy operation. * Implements the o.a.h.mapreduce.Mapper interface. @@ -74,6 +76,15 @@ static enum FileAction { OVERWRITE, // Overwrite the whole file } + /** + * Indicates the checksum comparison result. + */ + public enum ChecksumComparison { + COMPATIBLE_AND_TRUE, // checksum compariosn is compatible and true. + COMPATIBLE_AND_FALSE, // checksum compariosn is compatible and false. + INCOMPATIBLE, // checksum compariosn is not compatible. + } + private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class); private Configuration conf; @@ -85,7 +96,7 @@ static enum FileAction { private boolean append = false; private boolean verboseLog = false; private boolean directWrite = false; - private boolean useModTimeToUpdate = true; + private boolean useModTimeToUpdate; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -116,7 +127,8 @@ public void setup(Context context) throws IOException, InterruptedException { directWrite = conf.getBoolean( DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); useModTimeToUpdate = - conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, true); + conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, + CONF_LABEL_UPDATE_MOD_TIME_DEFAULT); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -361,31 +373,61 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, if (sameLength && source.getLen() == 0) { return true; } - // if both the source and target have the same length, then check if the - // config to use modification time is set to true, then use the - // modification time and checksum comparison to determine if the copy can - // be skipped else if not set then just use the checksum comparison to - // check copy skip. + // If the src and target file have same size and block size, we would + // check if the checkCrc flag is enabled or not. If enabled, and the + // modTime comparison is enabled then return true if target file is older + // than the source file, since this indicates that the target file is + // recently updated and the source is not changed more recently than the + // update, we can skip the copy else we would copy. + // If skipCrc flag is disabled, we would check the checksum comparison + // which is an enum representing 3 values, of which if the comparison + // returns NOT_COMPATIBLE, we'll try to check modtime again, else return + // the result of checksum comparison which are compatible(true or false). // // Note: Different object stores can have different checksum algorithms // resulting in no checksum comparison that results in return true // always, having the modification time enabled can help in these // scenarios to not incorrectly skip a copy. Refer: HADOOP-18596. + if (sameLength && sameBlockSize) { - if (useModTimeToUpdate) { - return - (source.getModificationTime() < target.getModificationTime()) && - (skipCrc || DistCpUtils.checksumsAreEqual(sourceFS, - source.getPath(), null, - targetFS, target.getPath(), source.getLen())); + if (skipCrc) { + return maybeUseModTimeToCompare(source, target); } else { - return skipCrc || DistCpUtils + ChecksumComparison checksumComparison = DistCpUtils .checksumsAreEqual(sourceFS, source.getPath(), null, targetFS, target.getPath(), source.getLen()); + LOG.debug("Result of checksum comparison between src {} and target " + + "{} : {}", source, target, checksumComparison); + if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) { + return maybeUseModTimeToCompare(source, target); + } + // if skipCrc is disabled and checksumComparison is compatible we + // need not check the mod time. + return checksumComparison + .equals(ChecksumComparison.COMPATIBLE_AND_TRUE); } - } else { - return false; } + return false; + } + + /** + * If the mod time comparison is enabled, check the mod time else return + * false. + * Comparison: If the target file perceives to have greater mod time(older) + * than the source file, we can assume that there has been no new changes + * that occurred in the source file, hence we should return true to skip the + * copy of the file. + * @param source Source fileStatus. + * @param target Target fileStatus. + * @return boolean representing result of modTime check. + */ + private boolean maybeUseModTimeToCompare( + CopyListingFileStatus source, FileStatus target) { + if (useModTimeToUpdate) { + return source.getModificationTime() < target.getModificationTime(); + } + // if we cannot check mod time, return true (skip the copy). + return true; } @Override diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 1af434e19f823..9f55bd23b7d8f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpContext; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.UniformSizeInputFormat; import org.apache.hadoop.util.StringUtils; @@ -568,10 +569,12 @@ public static String getStringDescriptionFor(long nBytes) { * and false otherwise. * @throws IOException if there's an exception while retrieving checksums. */ - public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, - FileChecksum sourceChecksum, - FileSystem targetFS, - Path target, long sourceLen) + public static CopyMapper.ChecksumComparison checksumsAreEqual( + FileSystem sourceFS, + Path source, + FileChecksum sourceChecksum, + FileSystem targetFS, + Path target, long sourceLen) throws IOException { FileChecksum targetChecksum = null; try { @@ -585,8 +588,15 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, } catch (IOException e) { LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); } - return (sourceChecksum == null || targetChecksum == null || - sourceChecksum.equals(targetChecksum)); + // If the source or target checksum is null, that means there is no + // comparison that took place and return not compatible. + // else if matched, return compatible with the matched result. + if (sourceChecksum == null || targetChecksum == null) { + return CopyMapper.ChecksumComparison.INCOMPATIBLE; + } else if (sourceChecksum.equals(targetChecksum)) { + return CopyMapper.ChecksumComparison.COMPATIBLE_AND_TRUE; + } + return CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE; } /** @@ -613,8 +623,13 @@ public static void compareFileLengthsAndChecksums(long srcLen, //At this point, src & dest lengths are same. if length==0, we skip checksum if ((srcLen != 0) && (!skipCrc)) { - if (!checksumsAreEqual(sourceFS, source, sourceChecksum, - targetFS, target, srcLen)) { + CopyMapper.ChecksumComparison + checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum, + targetFS, target, srcLen); + // If Checksum comparison is false set it to false, else set to true. + boolean checksumResult = !checksumComparison.equals( + CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE); + if (!checksumResult) { StringBuilder errorMessage = new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG) .append(source).append(" and ").append(target).append("."); diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index 77915965bbd46..2d77619d5cf3a 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -650,18 +650,16 @@ checksums if the checksum algorithm between the two stores is different. cluster and that of the object store. Otherwise, changed files may be missed/copied too often. -* `distcp.update.modification.time` can be used alongside the checksum check - in stores with same checksum algorithm as well. if set to true we check - both modification time and checksum between the files, but if this property - is set to false we only compare the checksum between the files to determine - if we should skip the copy or not. - - To turn off, set this in your core-site.xml +* `distcp.update.modification.time` would only be used if either of the two + stores don't have checksum validation resulting in incompatible checksum + comparison between the two. Even if the property is set to true, it won't + be used if their is valid checksum comparison between the two stores. +To turn off the modification time check, set this in your core-site.xml ```xml distcp.update.modification.time - true + false ``` diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 05ee831e3dae2..532abc2aa4060 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -376,7 +376,7 @@ private Job distCpUpdateWithFs(final Path srcDir, final Path destDir, Collections.singletonList(srcDir), destDir) .withDeleteMissing(true) .withSyncFolder(true) - .withSkipCRC(true) + .withSkipCRC(false) .withDirectWrite(shouldUseDirectWrite()) .withOverwrite(false))); } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 599f3ec2db61e..9e86088e49a16 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -562,9 +562,11 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) Path sourcePath = new Path(sourceBase + srcFilename); CopyListingFileStatus sourceCurrStatus = new CopyListingFileStatus(fs.getFileStatus(sourcePath)); - Assert.assertFalse(DistCpUtils.checksumsAreEqual( + Assert.assertFalse(!DistCpUtils.checksumsAreEqual( fs, new Path(sourceBase + srcFilename), null, - fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen())); + fs, new Path(targetBase + srcFilename), + sourceCurrStatus.getLen()) + .equals(CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE)); } catch(IOException exception) { if (skipCrc) { LOG.error("Unexpected exception is found", exception); From d64e6b66d4851b711c40dfaf0b9ffc2a8e5a24ac Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Wed, 8 Feb 2023 17:48:20 +0530 Subject: [PATCH 7/9] HADOOP-18596. renaming enum. --- .../java/org/apache/hadoop/tools/mapred/CopyMapper.java | 9 ++++----- .../java/org/apache/hadoop/tools/util/DistCpUtils.java | 7 +++---- .../apache/hadoop/tools/mapred/TestCopyCommitter.java | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 3c2c59e31660c..c78afca74de11 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -80,9 +80,9 @@ static enum FileAction { * Indicates the checksum comparison result. */ public enum ChecksumComparison { - COMPATIBLE_AND_TRUE, // checksum compariosn is compatible and true. - COMPATIBLE_AND_FALSE, // checksum compariosn is compatible and false. - INCOMPATIBLE, // checksum compariosn is not compatible. + TRUE, // checksum comparison is compatible and true. + FALSE, // checksum comparison is compatible and false. + INCOMPATIBLE, // checksum comparison is not compatible. } private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class); @@ -403,8 +403,7 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, } // if skipCrc is disabled and checksumComparison is compatible we // need not check the mod time. - return checksumComparison - .equals(ChecksumComparison.COMPATIBLE_AND_TRUE); + return checksumComparison.equals(ChecksumComparison.TRUE); } } return false; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 9f55bd23b7d8f..e77b2031a76db 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -594,9 +594,9 @@ public static CopyMapper.ChecksumComparison checksumsAreEqual( if (sourceChecksum == null || targetChecksum == null) { return CopyMapper.ChecksumComparison.INCOMPATIBLE; } else if (sourceChecksum.equals(targetChecksum)) { - return CopyMapper.ChecksumComparison.COMPATIBLE_AND_TRUE; + return CopyMapper.ChecksumComparison.TRUE; } - return CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE; + return CopyMapper.ChecksumComparison.FALSE; } /** @@ -627,8 +627,7 @@ public static void compareFileLengthsAndChecksums(long srcLen, checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum, targetFS, target, srcLen); // If Checksum comparison is false set it to false, else set to true. - boolean checksumResult = !checksumComparison.equals( - CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE); + boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE); if (!checksumResult) { StringBuilder errorMessage = new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 9e86088e49a16..a4847e60a5c9b 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -566,7 +566,7 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) fs, new Path(sourceBase + srcFilename), null, fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()) - .equals(CopyMapper.ChecksumComparison.COMPATIBLE_AND_FALSE)); + .equals(CopyMapper.ChecksumComparison.FALSE)); } catch(IOException exception) { if (skipCrc) { LOG.error("Unexpected exception is found", exception); From 0f63b45b01ad69d7ccc810a52b22dbcfbab4c0cc Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Thu, 9 Feb 2023 11:53:40 +0530 Subject: [PATCH 8/9] HADOOP-18596. review comments. --- .../org/apache/hadoop/tools/mapred/CopyMapper.java | 11 ++++++----- .../apache/hadoop/tools/mapred/TestCopyCommitter.java | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index c78afca74de11..ad17e574ca9b8 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -412,10 +412,11 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, /** * If the mod time comparison is enabled, check the mod time else return * false. - * Comparison: If the target file perceives to have greater mod time(older) - * than the source file, we can assume that there has been no new changes - * that occurred in the source file, hence we should return true to skip the - * copy of the file. + * Comparison: If the target file perceives to have greater or equal mod time + * (older) than the source file, we can assume that there has been no new + * changes that occurred in the source file, hence we should return true to + * skip the copy of the file. + * * @param source Source fileStatus. * @param target Target fileStatus. * @return boolean representing result of modTime check. @@ -423,7 +424,7 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, private boolean maybeUseModTimeToCompare( CopyListingFileStatus source, FileStatus target) { if (useModTimeToUpdate) { - return source.getModificationTime() < target.getModificationTime(); + return source.getModificationTime() <= target.getModificationTime(); } // if we cannot check mod time, return true (skip the copy). return true; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index a4847e60a5c9b..f48608c89bddb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -562,11 +562,12 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) Path sourcePath = new Path(sourceBase + srcFilename); CopyListingFileStatus sourceCurrStatus = new CopyListingFileStatus(fs.getFileStatus(sourcePath)); - Assert.assertFalse(!DistCpUtils.checksumsAreEqual( - fs, new Path(sourceBase + srcFilename), null, - fs, new Path(targetBase + srcFilename), - sourceCurrStatus.getLen()) - .equals(CopyMapper.ChecksumComparison.FALSE)); + Assert.assertEquals("Checksum should not be equal", + DistCpUtils.checksumsAreEqual( + fs, new Path(sourceBase + srcFilename), null, + fs, new Path(targetBase + srcFilename), + sourceCurrStatus.getLen()), + CopyMapper.ChecksumComparison.FALSE); } catch(IOException exception) { if (skipCrc) { LOG.error("Unexpected exception is found", exception); From 58d8f84aa532f953da99ab8fe5bed9c28ea442f9 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Thu, 9 Feb 2023 16:27:30 +0530 Subject: [PATCH 9/9] HADOOP-18596. test fix. --- .../org/apache/hadoop/tools/mapred/TestCopyCommitter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index f48608c89bddb..53002445db79f 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -563,11 +563,11 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) CopyListingFileStatus sourceCurrStatus = new CopyListingFileStatus(fs.getFileStatus(sourcePath)); Assert.assertEquals("Checksum should not be equal", + CopyMapper.ChecksumComparison.FALSE, DistCpUtils.checksumsAreEqual( fs, new Path(sourceBase + srcFilename), null, fs, new Path(targetBase + srcFilename), - sourceCurrStatus.getLen()), - CopyMapper.ChecksumComparison.FALSE); + sourceCurrStatus.getLen())); } catch(IOException exception) { if (skipCrc) { LOG.error("Unexpected exception is found", exception);