|
17 | 17 | */ |
18 | 18 | package org.apache.hadoop.fs.contract.s3a; |
19 | 19 |
|
| 20 | +import java.io.ByteArrayInputStream; |
| 21 | +import java.util.HashMap; |
| 22 | +import java.util.Map; |
| 23 | + |
| 24 | +import org.junit.Test; |
20 | 25 | import org.slf4j.Logger; |
21 | 26 | import org.slf4j.LoggerFactory; |
22 | 27 |
|
23 | 28 | import org.apache.hadoop.conf.Configuration; |
| 29 | +import org.apache.hadoop.fs.FileStatus; |
| 30 | +import org.apache.hadoop.fs.MultipartUploader; |
| 31 | +import org.apache.hadoop.fs.PartHandle; |
24 | 32 | import org.apache.hadoop.fs.Path; |
| 33 | +import org.apache.hadoop.fs.UploadHandle; |
25 | 34 | import org.apache.hadoop.fs.contract.AbstractFSContract; |
26 | 35 | import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; |
27 | 36 | import org.apache.hadoop.fs.contract.ContractTestUtils; |
28 | 37 | import org.apache.hadoop.fs.s3a.S3AFileSystem; |
29 | 38 | import org.apache.hadoop.fs.s3a.WriteOperationHelper; |
30 | 39 |
|
| 40 | +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; |
31 | 41 | import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; |
32 | 42 | import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; |
33 | 43 | import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; |
| 44 | +import static org.apache.hadoop.test.LambdaTestUtils.eventually; |
34 | 45 |
|
35 | 46 | /** |
36 | 47 | * Test MultipartUploader with S3A. |
@@ -159,4 +170,47 @@ public void testDirectoryInTheWay() throws Exception { |
159 | 170 | public void testMultipartUploadReverseOrder() throws Exception { |
160 | 171 | ContractTestUtils.skip("skipped for speed"); |
161 | 172 | } |
| 173 | + |
| 174 | + /** |
| 175 | + * This creates and then deletes a zero-byte file while an upload |
| 176 | + * is in progress, and verifies that the uploaded file is ultimately |
| 177 | + * visible. |
| 178 | + */ |
| 179 | + @Test |
| 180 | + public void testMultipartOverlapWithTransientFile() throws Throwable { |
| 181 | + // until there's a way to explicitly ask for a multipart uploader from a |
| 182 | + // specific FS, explicitly create one bonded to the raw FS. |
| 183 | + describe("testMultipartOverlapWithTransientFile"); |
| 184 | + S3AFileSystem fs = getFileSystem(); |
| 185 | + Path path = path("testMultipartOverlapWithTransientFile"); |
| 186 | + fs.delete(path, true); |
| 187 | + MultipartUploader mpu = mpu(1); |
| 188 | + UploadHandle upload1 = mpu.initialize(path); |
| 189 | + byte[] dataset = dataset(1024, '0', 10); |
| 190 | + final Map<Integer, PartHandle> handles = new HashMap<>(); |
| 191 | + LOG.info("Uploading multipart entry"); |
| 192 | + PartHandle value = mpu.putPart(path, new ByteArrayInputStream(dataset), 1, |
| 193 | + upload1, |
| 194 | + dataset.length); |
| 195 | + // upload 1K |
| 196 | + handles.put(1, value); |
| 197 | + // confirm the path is absent |
| 198 | + ContractTestUtils.assertPathDoesNotExist(fs, |
| 199 | + "path being uploaded", path); |
| 200 | + // now create an empty file |
| 201 | + ContractTestUtils.touch(fs, path); |
| 202 | + final FileStatus touchStatus = fs.getFileStatus(path); |
| 203 | + LOG.info("0-byte file has been created: {}", touchStatus); |
| 204 | + fs.delete(path, false); |
| 205 | + // now complete the upload |
| 206 | + mpu.complete(path, handles, upload1); |
| 207 | + |
| 208 | + // wait for the data to arrive |
| 209 | + eventually(timeToBecomeConsistentMillis(), 500, () -> { |
| 210 | + FileStatus mpuStatus = fs.getFileStatus(path); |
| 211 | + assertTrue("File is empty in " + mpuStatus, mpuStatus.getLen() > 0); |
| 212 | + return mpuStatus; |
| 213 | + }); |
| 214 | + |
| 215 | + } |
162 | 216 | } |
0 commit comments