Skip to content

Commit 040a202

Browse files
committed
HADOOP-15323. AliyunOSS: Improve copy file performance for AliyunOSSFileSystemStore. Contributed wujinhu.
1 parent f660e5e commit 040a202

File tree

8 files changed

+36
-42
lines changed

8 files changed

+36
-42
lines changed

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@ public class AliyunOSSCopyFileTask implements Runnable {
3232

3333
private AliyunOSSFileSystemStore store;
3434
private String srcKey;
35+
private long srcLen;
3536
private String dstKey;
3637
private AliyunOSSCopyFileContext copyFileContext;
3738

3839
public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
39-
String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
40+
String srcKey, long srcLen,
41+
String dstKey, AliyunOSSCopyFileContext copyFileContext) {
4042
this.store = store;
4143
this.srcKey = srcKey;
44+
this.srcLen = srcLen;
4245
this.dstKey = dstKey;
4346
this.copyFileContext = copyFileContext;
4447
}
@@ -47,7 +50,7 @@ public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
4750
public void run() {
4851
boolean fail = false;
4952
try {
50-
store.copyFile(srcKey, dstKey);
53+
store.copyFile(srcKey, srcLen, dstKey);
5154
} catch (Exception e) {
5255
LOG.warn("Exception thrown when copy from "
5356
+ srcKey + " to " + dstKey + ", exception: " + e);

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ public boolean rename(Path srcPath, Path dstPath) throws IOException {
653653
if (srcStatus.isDirectory()) {
654654
copyDirectory(srcPath, dstPath);
655655
} else {
656-
copyFile(srcPath, dstPath);
656+
copyFile(srcPath, srcStatus.getLen(), dstPath);
657657
}
658658

659659
return srcPath.equals(dstPath) || delete(srcPath, true);
@@ -664,13 +664,14 @@ public boolean rename(Path srcPath, Path dstPath) throws IOException {
664664
* (the caller should make sure srcPath is a file and dstPath is valid)
665665
*
666666
* @param srcPath source path.
667+
* @param srcLen source path length if it is a file.
667668
* @param dstPath destination path.
668669
* @return true if file is successfully copied.
669670
*/
670-
private boolean copyFile(Path srcPath, Path dstPath) {
671+
private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
671672
String srcKey = pathToKey(srcPath);
672673
String dstKey = pathToKey(dstPath);
673-
return store.copyFile(srcKey, dstKey);
674+
return store.copyFile(srcKey, srcLen, dstKey);
674675
}
675676

676677
/**
@@ -709,7 +710,8 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
709710

710711
//copy operation just copies metadata, oss will support shallow copy
711712
executorService.execute(new AliyunOSSCopyFileTask(
712-
store, objectSummary.getKey(), newKey, copyFileContext));
713+
store, objectSummary.getKey(),
714+
objectSummary.getSize(), newKey, copyFileContext));
713715
copiesToFinish++;
714716
// No need to call lock() here.
715717
// It's ok to copy one more file if the rename operation failed

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public class AliyunOSSFileSystemStore {
8787
private OSSClient ossClient;
8888
private String bucketName;
8989
private long uploadPartSize;
90-
private long multipartThreshold;
9190
private int maxKeys;
9291
private String serverSideEncryptionAlgorithm;
9392

@@ -155,21 +154,10 @@ public void initialize(URI uri, Configuration conf, String user,
155154
ossClient = new OSSClient(endPoint, provider, clientConf);
156155
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
157156
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
158-
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
159-
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
157+
160158
serverSideEncryptionAlgorithm =
161159
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
162160

163-
if (multipartThreshold < 5 * 1024 * 1024) {
164-
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
165-
multipartThreshold = 5 * 1024 * 1024;
166-
}
167-
168-
if (multipartThreshold > 1024 * 1024 * 1024) {
169-
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
170-
multipartThreshold = 1024 * 1024 * 1024;
171-
}
172-
173161
bucketName = uri.getHost();
174162

175163
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
@@ -305,18 +293,19 @@ public void storeEmptyFile(String key) throws IOException {
305293
* Copy an object from source key to destination key.
306294
*
307295
* @param srcKey source key.
296+
* @param srcLen source file length.
308297
* @param dstKey destination key.
309298
* @return true if file is successfully copied.
310299
*/
311-
public boolean copyFile(String srcKey, String dstKey) {
312-
ObjectMetadata objectMeta =
313-
ossClient.getObjectMetadata(bucketName, srcKey);
314-
statistics.incrementReadOps(1);
315-
long contentLength = objectMeta.getContentLength();
316-
if (contentLength <= multipartThreshold) {
300+
public boolean copyFile(String srcKey, long srcLen, String dstKey) {
301+
try {
302+
//1, try single copy first
317303
return singleCopy(srcKey, dstKey);
318-
} else {
319-
return multipartCopy(srcKey, contentLength, dstKey);
304+
} catch (Exception e) {
305+
//2, if failed(shallow copy not supported), then multi part copy
306+
LOG.debug("Exception thrown when copy file: " + srcKey
307+
+ ", exception: " + e + ", use multipartCopy instead");
308+
return multipartCopy(srcKey, srcLen, dstKey);
320309
}
321310
}
322311

hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ please raise your issues with them.
282282
<property>
283283
<name>fs.oss.multipart.upload.threshold</name>
284284
<value>20971520</value>
285-
<description>Minimum size in bytes before we start a multipart uploads or copy.</description>
285+
<description>Minimum size in bytes before we start a multipart uploads or copy.
286+
Notice: This property is deprecated and will be removed in further version.
287+
</description>
286288
</property>
287289

288290
<property>

hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public class TestAliyunOSSBlockOutputStream {
4949
@Before
5050
public void setUp() throws Exception {
5151
Configuration conf = new Configuration();
52-
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
5352
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
5453
conf.setInt(IO_CHUNK_BUFFER_SIZE,
5554
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));

hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,13 @@ public void testRenameDirectoryCopyTaskAllSucceed() throws Exception {
178178
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
179179
store.storeEmptyFile("test/new/file/");
180180
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
181-
store, srcOne.toUri().getPath().substring(1),
181+
store, srcOne.toUri().getPath().substring(1), data.length,
182182
dstOne.toUri().getPath().substring(1), copyFileContext);
183183
oneCopyFileTask.run();
184184
assumeFalse(copyFileContext.isCopyFailure());
185185

186186
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
187-
store, srcOne.toUri().getPath().substring(1),
187+
store, srcOne.toUri().getPath().substring(1), data.length,
188188
dstTwo.toUri().getPath().substring(1), copyFileContext);
189189
twoCopyFileTask.run();
190190
assumeFalse(copyFileContext.isCopyFailure());
@@ -212,13 +212,13 @@ public void testRenameDirectoryCopyTaskAllFailed() throws Exception {
212212
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
213213
//store.storeEmptyFile("test/new/file/");
214214
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
215-
store, srcOne.toUri().getPath().substring(1),
215+
store, srcOne.toUri().getPath().substring(1), data.length,
216216
dstOne.toUri().getPath().substring(1), copyFileContext);
217217
oneCopyFileTask.run();
218218
assumeTrue(copyFileContext.isCopyFailure());
219219

220220
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
221-
store, srcOne.toUri().getPath().substring(1),
221+
store, srcOne.toUri().getPath().substring(1), data.length,
222222
dstTwo.toUri().getPath().substring(1), copyFileContext);
223223
twoCopyFileTask.run();
224224
assumeTrue(copyFileContext.isCopyFailure());
@@ -247,19 +247,19 @@ public void testRenameDirectoryCopyTaskPartialFailed() throws Exception {
247247
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
248248
//store.storeEmptyFile("test/new/file/");
249249
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
250-
store, srcOne.toUri().getPath().substring(1),
250+
store, srcOne.toUri().getPath().substring(1), data.length,
251251
dstOne.toUri().getPath().substring(1), copyFileContext);
252252
oneCopyFileTask.run();
253253
assumeTrue(copyFileContext.isCopyFailure());
254254

255255
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
256-
store, srcOne.toUri().getPath().substring(1),
256+
store, srcOne.toUri().getPath().substring(1), data.length,
257257
dstTwo.toUri().getPath().substring(1), copyFileContext);
258258
twoCopyFileTask.run();
259259
assumeTrue(copyFileContext.isCopyFailure());
260260

261261
AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
262-
store, srcOne.toUri().getPath().substring(1),
262+
store, srcOne.toUri().getPath().substring(1), data.length,
263263
dstThree.toUri().getPath().substring(1), copyFileContext);
264264
threeCopyFileTask.run();
265265
assumeTrue(copyFileContext.isCopyFailure());

hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ public static void checkSettings() throws Exception {
7878

7979
protected void writeRenameReadCompare(Path path, long len)
8080
throws IOException, NoSuchAlgorithmException {
81-
// If len > fs.oss.multipart.upload.threshold,
82-
// we'll use a multipart upload copy
8381
MessageDigest digest = MessageDigest.getInstance("MD5");
8482
OutputStream out = new BufferedOutputStream(
8583
new DigestOutputStream(fs.create(path, false), digest));
@@ -92,10 +90,12 @@ protected void writeRenameReadCompare(Path path, long len)
9290
assertTrue("Exists", fs.exists(path));
9391

9492
Path copyPath = path.suffix(".copy");
93+
long start = System.currentTimeMillis();
9594
fs.rename(path, copyPath);
9695

9796
assertTrue("Copy exists", fs.exists(copyPath));
98-
97+
// should less than 1 second
98+
assertTrue(System.currentTimeMillis() - start < 1000);
9999
// Download file from Aliyun OSS and compare the digest against the original
100100
MessageDigest digest2 = MessageDigest.getInstance("MD5");
101101
InputStream in = new BufferedInputStream(
@@ -119,7 +119,7 @@ public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
119119
@Test
120120
public void testLargeUpload()
121121
throws IOException, NoSuchAlgorithmException {
122-
// Multipart upload, multipart copy
123-
writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
122+
// Multipart upload, shallow copy
123+
writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB
124124
}
125125
}

hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
3232
@Override
3333
protected Configuration createConfiguration() {
3434
Configuration newConf = super.createConfiguration();
35-
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
3635
newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
3736
return newConf;
3837
}

0 commit comments

Comments
 (0)