Skip to content

Commit feccd28

Browse files
committed
HADOOP-16107. Update ChecksumFileSystem createFile/openFile API to generate checksum.
Contributed by Steve Loughran
1 parent 5b43e42 commit feccd28

File tree

5 files changed

+456
-5
lines changed

5 files changed

+456
-5
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,22 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27+
import java.util.Collections;
28+
import java.util.EnumSet;
2729
import java.util.List;
30+
import java.util.Set;
31+
import java.util.concurrent.CompletableFuture;
2832

2933
import com.google.common.base.Preconditions;
3034
import org.apache.hadoop.classification.InterfaceAudience;
3135
import org.apache.hadoop.classification.InterfaceStability;
3236
import org.apache.hadoop.conf.Configuration;
37+
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
38+
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
3339
import org.apache.hadoop.fs.permission.AclEntry;
3440
import org.apache.hadoop.fs.permission.FsPermission;
3541
import org.apache.hadoop.util.DataChecksum;
42+
import org.apache.hadoop.util.LambdaUtils;
3643
import org.apache.hadoop.util.Progressable;
3744

3845
/****************************************************************
@@ -484,6 +491,32 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
484491
blockSize, progress);
485492
}
486493

494+
@Override
495+
public FSDataOutputStream create(final Path f,
496+
final FsPermission permission,
497+
final EnumSet<CreateFlag> flags,
498+
final int bufferSize,
499+
final short replication,
500+
final long blockSize,
501+
final Progressable progress,
502+
final Options.ChecksumOpt checksumOpt) throws IOException {
503+
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
504+
bufferSize, replication, blockSize, progress);
505+
}
506+
507+
@Override
508+
public FSDataOutputStream createNonRecursive(final Path f,
509+
final FsPermission permission,
510+
final EnumSet<CreateFlag> flags,
511+
final int bufferSize,
512+
final short replication,
513+
final long blockSize,
514+
final Progressable progress) throws IOException {
515+
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
516+
false, bufferSize, replication,
517+
blockSize, progress);
518+
}
519+
487520
abstract class FsOperation {
488521
boolean run(Path p) throws IOException {
489522
boolean status = apply(p);
@@ -780,4 +813,57 @@ public boolean reportChecksumFailure(Path f, FSDataInputStream in,
780813
long inPos, FSDataInputStream sums, long sumsPos) {
781814
return false;
782815
}
816+
817+
/**
818+
* This is overridden to ensure that this class's
819+
* {@link #openFileWithOptions}() method is called, and so ultimately
820+
* its {@link #open(Path, int)}.
821+
*
822+
* {@inheritDoc}
823+
*/
824+
@Override
825+
public FutureDataInputStreamBuilder openFile(final Path path)
826+
throws IOException, UnsupportedOperationException {
827+
return ((FutureDataInputStreamBuilderImpl)
828+
createDataInputStreamBuilder(this, path)).getThisBuilder();
829+
}
830+
831+
/**
832+
* Open the file as a blocking call to {@link #open(Path, int)}.
833+
*
834+
* {@inheritDoc}
835+
*/
836+
@Override
837+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
838+
final Path path,
839+
final Set<String> mandatoryKeys,
840+
final Configuration options,
841+
final int bufferSize) throws IOException {
842+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
843+
Collections.emptySet(),
844+
"for " + path);
845+
return LambdaUtils.eval(
846+
new CompletableFuture<>(), () -> open(path, bufferSize));
847+
}
848+
849+
/**
850+
* This is overridden to ensure that this class's create() method is
851+
* ultimately called.
852+
*
853+
* {@inheritDoc}
854+
*/
855+
public FSDataOutputStreamBuilder createFile(Path path) {
856+
return createDataOutputStreamBuilder(this, path)
857+
.create().overwrite(true);
858+
}
859+
860+
/**
861+
* This is overridden to ensure that this class's create() method is
862+
* ultimately called.
863+
*
864+
* {@inheritDoc}
865+
*/
866+
public FSDataOutputStreamBuilder appendFile(Path path) {
867+
return createDataOutputStreamBuilder(this, path).append();
868+
}
783869
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4240,14 +4240,34 @@ public static GlobalStorageStatistics getGlobalStorageStatistics() {
42404240
return GlobalStorageStatistics.INSTANCE;
42414241
}
42424242

4243+
/**
4244+
* Create instance of the standard FSDataOutputStreamBuilder for the
4245+
* given filesystem and path.
4246+
* @param fileSystem owner
4247+
* @param path path to create
4248+
* @return a builder.
4249+
*/
4250+
@InterfaceStability.Unstable
4251+
protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder(
4252+
@Nonnull final FileSystem fileSystem,
4253+
@Nonnull final Path path) {
4254+
return new FileSystemDataOutputStreamBuilder(fileSystem, path);
4255+
}
4256+
4257+
/**
4258+
* Standard implementation of the FSDataOutputStreamBuilder; invokes
4259+
* create/createNonRecursive or Append depending upon the options.
4260+
*/
42434261
private static final class FileSystemDataOutputStreamBuilder extends
42444262
FSDataOutputStreamBuilder<FSDataOutputStream,
42454263
FileSystemDataOutputStreamBuilder> {
42464264

42474265
/**
42484266
* Constructor.
4267+
* @param fileSystem owner
4268+
* @param p path to create
42494269
*/
4250-
protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
4270+
private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
42514271
super(fileSystem, p);
42524272
}
42534273

@@ -4290,7 +4310,7 @@ public FileSystemDataOutputStreamBuilder getThisBuilder() {
42904310
* builder interface becomes stable.
42914311
*/
42924312
public FSDataOutputStreamBuilder createFile(Path path) {
4293-
return new FileSystemDataOutputStreamBuilder(this, path)
4313+
return createDataOutputStreamBuilder(this, path)
42944314
.create().overwrite(true);
42954315
}
42964316

@@ -4300,7 +4320,7 @@ public FSDataOutputStreamBuilder createFile(Path path) {
43004320
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
43014321
*/
43024322
public FSDataOutputStreamBuilder appendFile(Path path) {
4303-
return new FileSystemDataOutputStreamBuilder(this, path).append();
4323+
return createDataOutputStreamBuilder(this, path).append();
43044324
}
43054325

43064326
/**
@@ -4321,7 +4341,7 @@ public FSDataOutputStreamBuilder appendFile(Path path) {
43214341
@InterfaceStability.Unstable
43224342
public FutureDataInputStreamBuilder openFile(Path path)
43234343
throws IOException, UnsupportedOperationException {
4324-
return new FSDataInputStreamBuilder(this, path).getThisBuilder();
4344+
return createDataInputStreamBuilder(this, path).getThisBuilder();
43254345
}
43264346

43274347
/**
@@ -4340,7 +4360,7 @@ public FutureDataInputStreamBuilder openFile(Path path)
43404360
@InterfaceStability.Unstable
43414361
public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
43424362
throws IOException, UnsupportedOperationException {
4343-
return new FSDataInputStreamBuilder(this, pathHandle)
4363+
return createDataInputStreamBuilder(this, pathHandle)
43444364
.getThisBuilder();
43454365
}
43464366

@@ -4416,6 +4436,36 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
44164436
return result;
44174437
}
44184438

4439+
/**
4440+
* Create instance of the standard {@link FSDataInputStreamBuilder} for the
4441+
* given filesystem and path.
4442+
* @param fileSystem owner
4443+
* @param path path to read
4444+
* @return a builder.
4445+
*/
4446+
@InterfaceAudience.LimitedPrivate("Filesystems")
4447+
@InterfaceStability.Unstable
4448+
protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
4449+
@Nonnull final FileSystem fileSystem,
4450+
@Nonnull final Path path) {
4451+
return new FSDataInputStreamBuilder(fileSystem, path);
4452+
}
4453+
4454+
/**
4455+
* Create instance of the standard {@link FSDataInputStreamBuilder} for the
4456+
* given filesystem and path handle.
4457+
* @param fileSystem owner
4458+
* @param pathHandle path handle of file to open.
4459+
* @return a builder.
4460+
*/
4461+
@InterfaceAudience.LimitedPrivate("Filesystems")
4462+
@InterfaceStability.Unstable
4463+
protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
4464+
@Nonnull final FileSystem fileSystem,
4465+
@Nonnull final PathHandle pathHandle) {
4466+
return new FSDataInputStreamBuilder(fileSystem, pathHandle);
4467+
}
4468+
44194469
/**
44204470
* Builder returned for {@code #openFile(Path)}
44214471
* and {@code #openFile(PathHandle)}.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
import org.apache.hadoop.fs.CreateFlag;
3333
import org.apache.hadoop.fs.FSDataInputStream;
3434
import org.apache.hadoop.fs.FSDataOutputStream;
35+
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
3536
import org.apache.hadoop.fs.FileChecksum;
3637
import org.apache.hadoop.fs.FileStatus;
3738
import org.apache.hadoop.fs.FileSystem;
3839
import org.apache.hadoop.fs.FilterFileSystem;
3940
import org.apache.hadoop.fs.FsServerDefaults;
4041
import org.apache.hadoop.fs.FsStatus;
42+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
4143
import org.apache.hadoop.fs.LocatedFileStatus;
4244
import org.apache.hadoop.fs.Path;
4345
import org.apache.hadoop.fs.QuotaUsage;
@@ -464,4 +466,14 @@ public void unsetStoragePolicy(Path src) throws IOException {
464466
super.unsetStoragePolicy(fullPath(src));
465467
}
466468

469+
@Override
470+
public FSDataOutputStreamBuilder createFile(final Path path) {
471+
return super.createFile(fullPath(path));
472+
}
473+
474+
@Override
475+
public FutureDataInputStreamBuilder openFile(final Path path)
476+
throws IOException, UnsupportedOperationException {
477+
return super.openFile(fullPath(path));
478+
}
467479
}

0 commit comments

Comments
 (0)