Skip to content

Commit f094248

Browse files
committed
HADOOP-18596. review comments
1 parent d23f13b commit f094248

File tree

2 files changed

+93
-25
lines changed

2 files changed

+93
-25
lines changed

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,13 @@ private DistCpConstants() {
143143

144144
public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
145145

146-
/** Distcp -update to use modification time of source and target file to
147-
* check while skipping.
146+
/**
147+
* Enabling distcp -update to use modification time of source and target
148+
* file to check while copying same file with same size but different content.
149+
*
150+
* The check would verify if the target file is perceived as older than the
151+
* source then it indicates that the source has been recently updated and it
152+
* is a newer version than what was synced, so we should not skip the copy.
148153
*/
149154
public static final String CONF_LABEL_UPDATE_MOD_TIME =
150155
"distcp.update.modification.time";

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java

Lines changed: 86 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.tools.mapred.CopyMapper;
5151
import org.apache.hadoop.tools.util.DistCpTestUtils;
5252
import org.apache.hadoop.util.functional.RemoteIterators;
53+
import org.apache.http.annotation.Contract;
5354

5455
import org.assertj.core.api.Assertions;
5556
import org.junit.Before;
@@ -359,6 +360,29 @@ private Job distCpUpdate(final Path srcDir, final Path destDir)
359360
.withOverwrite(false)));
360361
}
361362

363+
/**
364+
* Run distcp -update srcDir destDir.
365+
* @param srcDir local source directory
366+
* @param destDir remote destination directory.
367+
* @return the completed job
368+
* @throws Exception any failure.
369+
*/
370+
private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
371+
FileSystem sourceFs, FileSystem targetFs)
372+
throws Exception {
373+
describe("\nDistcp -update from " + srcDir + " to " + destDir);
374+
lsR("Source Fs to update", sourceFs, srcDir);
375+
lsR("Target Fs before update", targetFs, destDir);
376+
return runDistCp(buildWithStandardOptions(
377+
new DistCpOptions.Builder(
378+
Collections.singletonList(srcDir), destDir)
379+
.withDeleteMissing(true)
380+
.withSyncFolder(true)
381+
.withSkipCRC(true)
382+
.withDirectWrite(shouldUseDirectWrite())
383+
.withOverwrite(false)));
384+
}
385+
362386
/**
363387
* Update the source directories as various tests expect,
364388
* including adding a new file.
@@ -868,35 +892,41 @@ public void testDistCpUpdateCheckFileSkip() throws Exception {
868892

869893
Path source = new Path(remoteDir, "file");
870894
Path dest = new Path(localDir, "file");
895+
896+
Path source_0byte = new Path(remoteDir, "file_0byte");
897+
Path dest_0byte = new Path(localDir, "file_0byte");
871898
dest = localFS.makeQualified(dest);
899+
dest_0byte = localFS.makeQualified(dest_0byte);
872900

873901
// Creating a source file with certain dataset.
874902
byte[] sourceBlock = dataset(10, 'a', 'z');
875903

876904
// Write the dataset and as well create the target path.
877-
try (FSDataOutputStream out = remoteFS.create(source)) {
878-
out.write(sourceBlock);
879-
localFS.create(dest);
880-
}
905+
ContractTestUtils.createFile(localFS, dest, true, sourceBlock);
906+
ContractTestUtils
907+
.writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
908+
1024, true);
881909

882-
verifyPathExists(remoteFS, "", source);
883-
verifyPathExists(localFS, "", dest);
884-
DistCpTestUtils
885-
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
886-
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
887-
conf);
910+
// Create 0 byte source and target files.
911+
ContractTestUtils.createFile(remoteFS, source_0byte, true, new byte[0]);
912+
ContractTestUtils.createFile(localFS, dest_0byte, true, new byte[0]);
913+
914+
// Execute the distcp -update job.
915+
Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);
888916

889917
// First distcp -update would normally copy the source to dest.
890918
verifyFileContents(localFS, dest, sourceBlock);
919+
// Verify 1 file was skipped in the distcp -update(They 0 byte files).
920+
// Verify 1 file was copied in the distcp -update(The new source file).
921+
verifySkipAndCopyCounter(job, 1, 1);
891922

892923
// Remove the source file and replace with a file with same name and size
893924
// but different content.
894925
remoteFS.delete(source, false);
895926
Path updatedSource = new Path(remoteDir, "file");
896927
byte[] updatedSourceBlock = dataset(10, 'b', 'z');
897-
try (FSDataOutputStream out = remoteFS.create(updatedSource)) {
898-
out.write(updatedSourceBlock);
899-
}
928+
ContractTestUtils.writeDataset(remoteFS, updatedSource,
929+
updatedSourceBlock, updatedSourceBlock.length, 1024, true);
900930

901931
// For testing purposes we would take the modification time of the
902932
// updated Source file and add an offset or subtract the offset and set
@@ -913,32 +943,65 @@ public void testDistCpUpdateCheckFileSkip() throws Exception {
913943
long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
914944
localFS.setTimes(dest, newTargetModTimeNew, -1);
915945

916-
DistCpTestUtils
917-
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
918-
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
919-
conf);
946+
// Execute the distcp -update job.
947+
Job updatedSourceJobOldSrc =
948+
distCpUpdateWithFs(remoteDir, localDir, remoteFS,
949+
localFS);
920950

921951
// File contents should remain same since the mod time for target is
922952
// newer than the updatedSource which indicates that the sync happened
923953
// more recently and there is no update.
924954
verifyFileContents(localFS, dest, sourceBlock);
955+
// Skipped both 0 byte file and sourceFile(since mod time of target is
956+
// older than the source it is perceived that source is of older version
957+
// and we can skip it's copy).
958+
verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);
925959

926960
// Subtract by an offset which would ensure enough gap for the test to
927961
// not fail due to race conditions.
928962
long newTargetModTimeOld =
929963
Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
930964
localFS.setTimes(dest, newTargetModTimeOld, -1);
931965

932-
DistCpTestUtils
933-
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
934-
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
935-
conf);
966+
// Execute the distcp -update job.
967+
Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
968+
remoteFS,
969+
localFS);
936970

937-
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
938-
.hasSize(1);
971+
// Verifying the target directory have both 0 byte file and the content
972+
// file.
973+
Assertions
974+
.assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
975+
.hasSize(2);
939976
// Now the copy should take place and the file contents should change
940977
// since the mod time for target is older than the source file indicating
941978
// that there was an update to the source after the last sync took place.
942979
verifyFileContents(localFS, dest, updatedSourceBlock);
980+
// Verifying we skipped the 0 byte file and copied the updated source
981+
// file (since the modification time of the new source is older than the
982+
// target now).
983+
verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
984+
}
985+
986+
/**
987+
* Method to check the skipped and copied counters of a distcp job.
988+
*
989+
* @param job job to check.
990+
* @param skipExpectedValue expected skip counter value.
991+
* @param copyExpectedValue expected copy counter value.
992+
* @throws IOException throw in case of failures.
993+
*/
994+
private void verifySkipAndCopyCounter(Job job,
995+
int skipExpectedValue, int copyExpectedValue) throws IOException {
996+
// get the skip and copy counters from the job.
997+
long skipActualValue = job.getCounters()
998+
.findCounter(CopyMapper.Counter.SKIP).getValue();
999+
long copyActualValue = job.getCounters()
1000+
.findCounter(CopyMapper.Counter.COPY).getValue();
1001+
// Verify if the actual values equals the expected ones.
1002+
assertEquals("Mismatch in COPY counter value", copyExpectedValue,
1003+
copyActualValue);
1004+
assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
1005+
skipActualValue);
9431006
}
9441007
}

0 commit comments

Comments
 (0)