Skip to content

Commit 9e4f50d

Browse files
authored
HADOOP-18596. Distcp -update to use modification time while checking for file skip. (apache#5308)
Adding toggleable support for modification time during distcp -update between two stores with incompatible checksum comparison. Contributed by: Mehakmeet Singh <[email protected]>
1 parent 113a9e4 commit 9e4f50d

File tree

6 files changed

+296
-24
lines changed

6 files changed

+296
-24
lines changed

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,26 @@ private DistCpConstants() {
142142
"distcp.blocks.per.chunk";
143143

144144
public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
145+
146+
/**
147+
* Enabling {@code distcp -update} to use modification time of source and
148+
* target file to check while copying same file with same size but
149+
* different content.
150+
*
151+
* The check would verify if the target file is perceived as older than the
152+
* source then it indicates that the source has been recently updated and it
153+
* is a newer version than what was synced, so we should not skip the copy.
154+
* {@value}
155+
*/
156+
public static final String CONF_LABEL_UPDATE_MOD_TIME =
157+
"distcp.update.modification.time";
158+
159+
/**
160+
* Default value for 'distcp.update.modification.time' configuration.
161+
*/
162+
public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT =
163+
true;
164+
145165
/**
146166
* Constants for DistCp return code to shell / consumer of ToolRunner's run
147167
*/

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.hadoop.tools.util.DistCpUtils;
4242
import org.apache.hadoop.util.StringUtils;
4343

44+
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT;
45+
4446
/**
4547
* Mapper class that executes the DistCp copy operation.
4648
* Implements the o.a.h.mapreduce.Mapper interface.
@@ -74,6 +76,15 @@ static enum FileAction {
7476
OVERWRITE, // Overwrite the whole file
7577
}
7678

79+
/**
80+
* Indicates the checksum comparison result.
81+
*/
82+
public enum ChecksumComparison {
83+
TRUE, // checksum comparison is compatible and true.
84+
FALSE, // checksum comparison is compatible and false.
85+
INCOMPATIBLE, // checksum comparison is not compatible.
86+
}
87+
7788
private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
7889

7990
private Configuration conf;
@@ -85,6 +96,7 @@ static enum FileAction {
8596
private boolean append = false;
8697
private boolean verboseLog = false;
8798
private boolean directWrite = false;
99+
private boolean useModTimeToUpdate;
88100
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
89101

90102
private FileSystem targetFS = null;
@@ -114,6 +126,9 @@ public void setup(Context context) throws IOException, InterruptedException {
114126
PRESERVE_STATUS.getConfigLabel()));
115127
directWrite = conf.getBoolean(
116128
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
129+
useModTimeToUpdate =
130+
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
131+
CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);
117132

118133
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
119134
Path targetFinalPath = new Path(conf.get(
@@ -354,13 +369,65 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
354369
boolean sameLength = target.getLen() == source.getLen();
355370
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
356371
|| !preserve.contains(FileAttribute.BLOCKSIZE);
372+
// Skip the copy if a 0 size file is being copied.
373+
if (sameLength && source.getLen() == 0) {
374+
return true;
375+
}
376+
// If the src and target file have same size and block size, we would
377+
// check if the checkCrc flag is enabled or not. If enabled, and the
378+
// modTime comparison is enabled then return true if target file is older
379+
// than the source file, since this indicates that the target file is
380+
// recently updated and the source is not changed more recently than the
381+
// update, we can skip the copy else we would copy.
382+
// If skipCrc flag is disabled, we would check the checksum comparison
383+
// which is an enum representing 3 values, of which if the comparison
384+
// returns NOT_COMPATIBLE, we'll try to check modtime again, else return
385+
// the result of checksum comparison which are compatible(true or false).
386+
//
387+
// Note: Different object stores can have different checksum algorithms
388+
// resulting in no checksum comparison that results in return true
389+
// always, having the modification time enabled can help in these
390+
// scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.
391+
357392
if (sameLength && sameBlockSize) {
358-
return skipCrc ||
359-
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
360-
targetFS, target.getPath(), source.getLen());
361-
} else {
362-
return false;
393+
if (skipCrc) {
394+
return maybeUseModTimeToCompare(source, target);
395+
} else {
396+
ChecksumComparison checksumComparison = DistCpUtils
397+
.checksumsAreEqual(sourceFS, source.getPath(), null,
398+
targetFS, target.getPath(), source.getLen());
399+
LOG.debug("Result of checksum comparison between src {} and target "
400+
+ "{} : {}", source, target, checksumComparison);
401+
if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) {
402+
return maybeUseModTimeToCompare(source, target);
403+
}
404+
// if skipCrc is disabled and checksumComparison is compatible we
405+
// need not check the mod time.
406+
return checksumComparison.equals(ChecksumComparison.TRUE);
407+
}
408+
}
409+
return false;
410+
}
411+
412+
/**
413+
* If the mod time comparison is enabled, check the mod time else return
414+
* false.
415+
* Comparison: If the target file perceives to have greater or equal mod time
416+
* (older) than the source file, we can assume that there has been no new
417+
* changes that occurred in the source file, hence we should return true to
418+
* skip the copy of the file.
419+
*
420+
* @param source Source fileStatus.
421+
* @param target Target fileStatus.
422+
* @return boolean representing result of modTime check.
423+
*/
424+
private boolean maybeUseModTimeToCompare(
425+
CopyListingFileStatus source, FileStatus target) {
426+
if (useModTimeToUpdate) {
427+
return source.getModificationTime() <= target.getModificationTime();
363428
}
429+
// if we cannot check mod time, return true (skip the copy).
430+
return true;
364431
}
365432

366433
@Override

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.tools.CopyListingFileStatus;
4242
import org.apache.hadoop.tools.DistCpContext;
4343
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
44+
import org.apache.hadoop.tools.mapred.CopyMapper;
4445
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
4546
import org.apache.hadoop.util.StringUtils;
4647

@@ -568,10 +569,12 @@ public static String getStringDescriptionFor(long nBytes) {
568569
* and false otherwise.
569570
* @throws IOException if there's an exception while retrieving checksums.
570571
*/
571-
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
572-
FileChecksum sourceChecksum,
573-
FileSystem targetFS,
574-
Path target, long sourceLen)
572+
public static CopyMapper.ChecksumComparison checksumsAreEqual(
573+
FileSystem sourceFS,
574+
Path source,
575+
FileChecksum sourceChecksum,
576+
FileSystem targetFS,
577+
Path target, long sourceLen)
575578
throws IOException {
576579
FileChecksum targetChecksum = null;
577580
try {
@@ -585,8 +588,15 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
585588
} catch (IOException e) {
586589
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
587590
}
588-
return (sourceChecksum == null || targetChecksum == null ||
589-
sourceChecksum.equals(targetChecksum));
591+
// If the source or target checksum is null, that means there is no
592+
// comparison that took place and return not compatible.
593+
// else if matched, return compatible with the matched result.
594+
if (sourceChecksum == null || targetChecksum == null) {
595+
return CopyMapper.ChecksumComparison.INCOMPATIBLE;
596+
} else if (sourceChecksum.equals(targetChecksum)) {
597+
return CopyMapper.ChecksumComparison.TRUE;
598+
}
599+
return CopyMapper.ChecksumComparison.FALSE;
590600
}
591601

592602
/**
@@ -613,8 +623,12 @@ public static void compareFileLengthsAndChecksums(long srcLen,
613623

614624
//At this point, src & dest lengths are same. if length==0, we skip checksum
615625
if ((srcLen != 0) && (!skipCrc)) {
616-
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
617-
targetFS, target, srcLen)) {
626+
CopyMapper.ChecksumComparison
627+
checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum,
628+
targetFS, target, srcLen);
629+
// If Checksum comparison is false set it to false, else set to true.
630+
boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE);
631+
if (!checksumResult) {
618632
StringBuilder errorMessage =
619633
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
620634
.append(source).append(" and ").append(target).append(".");

hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -631,14 +631,37 @@ hadoop distcp -update -numListstatusThreads 20 \
631631
Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation
632632
on a large directory tree (the limit is 40 threads).
633633

634-
When `DistCp -update` is used with object stores,
635-
generally only the modification time and length of the individual files are compared,
636-
not any checksums. The fact that most object stores do have valid timestamps
637-
for directories is irrelevant; only the file timestamps are compared.
638-
However, it is important to have the clock of the client computers close
639-
to that of the infrastructure, so that timestamps are consistent between
640-
the client/HDFS cluster and that of the object store. Otherwise, changed files may be
641-
missed/copied too often.
634+
When `DistCp -update` is used with object stores, generally only the
635+
modification time and length of the individual files are compared, not any
636+
checksums if the checksum algorithm between the two stores is different.
637+
638+
* The `distcp -update` between two object stores with different checksum
639+
algorithm compares the modification times of source and target files along
640+
with the file size to determine whether to skip the file copy. The behavior
641+
is controlled by the property `distcp.update.modification.time`, which is
642+
set to true by default. If the source file is more recently modified than
643+
the target file, it is assumed that the content has changed, and the file
644+
should be updated.
645+
We need to ensure that there is no clock skew between the machines.
646+
The fact that most object stores do have valid timestamps for directories
647+
is irrelevant; only the file timestamps are compared. However, it is
648+
important to have the clock of the client computers close to that of the
649+
infrastructure, so that timestamps are consistent between the client/HDFS
650+
cluster and that of the object store. Otherwise, changed files may be
651+
missed/copied too often.
652+
653+
* `distcp.update.modification.time` would only be used if either of the two
654+
stores don't have checksum validation resulting in incompatible checksum
655+
comparison between the two. Even if the property is set to true, it won't
656+
be used if their is valid checksum comparison between the two stores.
657+
658+
To turn off the modification time check, set this in your core-site.xml
659+
```xml
660+
<property>
661+
<name>distcp.update.modification.time</name>
662+
<value>false</value>
663+
</property>
664+
```
642665

643666
**Notes**
644667

0 commit comments

Comments
 (0)