Skip to content

Commit 8c685dc

Browse files
mukund-thakursnvijaya
authored andcommitted
HADOOP-16965. Refactor abfs stream configuration. (apache#1956)
Contributed by Mukund Thakur.
1 parent ffcba01 commit 8c685dc

File tree

6 files changed

+204
-29
lines changed

6 files changed

+204
-29
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@
8080
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
8181
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
8282
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
83+
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
8384
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
85+
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
8486
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
8587
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
8688
import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -413,12 +415,18 @@ public OutputStream createFile(final Path path,
413415
statistics,
414416
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
415417
0,
416-
abfsConfiguration.getWriteBufferSize(),
417-
abfsConfiguration.isFlushEnabled(),
418-
abfsConfiguration.isOutputStreamFlushDisabled());
418+
populateAbfsOutputStreamContext());
419419
}
420420
}
421421

422+
private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
423+
return new AbfsOutputStreamContext()
424+
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
425+
.enableFlush(abfsConfiguration.isFlushEnabled())
426+
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
427+
.build();
428+
}
429+
422430
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
423431
throws AzureBlobFileSystemException {
424432
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@@ -464,11 +472,19 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
464472
// Add statistics for InputStream
465473
return new AbfsInputStream(client, statistics,
466474
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
467-
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
468-
abfsConfiguration.getTolerateOobAppends(), eTag);
475+
populateAbfsInputStreamContext(),
476+
eTag);
469477
}
470478
}
471479

480+
private AbfsInputStreamContext populateAbfsInputStreamContext() {
481+
return new AbfsInputStreamContext()
482+
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
483+
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
484+
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
485+
.build();
486+
}
487+
472488
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
473489
AzureBlobFileSystemException {
474490
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@@ -500,9 +516,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
500516
statistics,
501517
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
502518
offset,
503-
abfsConfiguration.getWriteBufferSize(),
504-
abfsConfiguration.isFlushEnabled(),
505-
abfsConfiguration.isOutputStreamFlushDisabled());
519+
populateAbfsOutputStreamContext());
506520
}
507521
}
508522

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,19 @@ public class AbfsInputStream extends FSInputStream {
5555
private boolean closed = false;
5656

5757
public AbfsInputStream(
58-
final AbfsClient client,
59-
final Statistics statistics,
60-
final String path,
61-
final long contentLength,
62-
final int bufferSize,
63-
final int readAheadQueueDepth,
64-
final boolean tolerateOobAppends,
65-
final String eTag) {
58+
final AbfsClient client,
59+
final Statistics statistics,
60+
final String path,
61+
final long contentLength,
62+
final AbfsInputStreamContext abfsInputStreamContext,
63+
final String eTag) {
6664
this.client = client;
6765
this.statistics = statistics;
6866
this.path = path;
6967
this.contentLength = contentLength;
70-
this.bufferSize = bufferSize;
71-
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
72-
this.tolerateOobAppends = tolerateOobAppends;
68+
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
69+
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
70+
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
7371
this.eTag = eTag;
7472
this.readAheadEnabled = true;
7573
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Class to hold extra input stream configs.
23+
*/
24+
public class AbfsInputStreamContext extends AbfsStreamContext {
25+
26+
private int readBufferSize;
27+
28+
private int readAheadQueueDepth;
29+
30+
private boolean tolerateOobAppends;
31+
32+
public AbfsInputStreamContext() {
33+
}
34+
35+
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
36+
this.readBufferSize = readBufferSize;
37+
return this;
38+
}
39+
40+
public AbfsInputStreamContext withReadAheadQueueDepth(
41+
final int readAheadQueueDepth) {
42+
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
43+
? readAheadQueueDepth
44+
: Runtime.getRuntime().availableProcessors();
45+
return this;
46+
}
47+
48+
public AbfsInputStreamContext withTolerateOobAppends(
49+
final boolean tolerateOobAppends) {
50+
this.tolerateOobAppends = tolerateOobAppends;
51+
return this;
52+
}
53+
54+
public AbfsInputStreamContext build() {
55+
// Validation of parameters to be done here.
56+
return this;
57+
}
58+
59+
public int getReadBufferSize() {
60+
return readBufferSize;
61+
}
62+
63+
public int getReadAheadQueueDepth() {
64+
return readAheadQueueDepth;
65+
}
66+
67+
public boolean isTolerateOobAppends() {
68+
return tolerateOobAppends;
69+
}
70+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
8080
private final Statistics statistics;
8181

8282
public AbfsOutputStream(
83-
final AbfsClient client,
84-
final Statistics statistics,
85-
final String path,
86-
final long position,
87-
final int bufferSize,
88-
final boolean supportFlush,
89-
final boolean disableOutputStreamFlush) {
83+
final AbfsClient client,
84+
final Statistics statistics,
85+
final String path,
86+
final long position,
87+
AbfsOutputStreamContext abfsOutputStreamContext) {
9088
this.client = client;
9189
this.statistics = statistics;
9290
this.path = path;
9391
this.position = position;
9492
this.closed = false;
95-
this.supportFlush = supportFlush;
96-
this.disableOutputStreamFlush = disableOutputStreamFlush;
93+
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
94+
this.disableOutputStreamFlush = abfsOutputStreamContext
95+
.isDisableOutputStreamFlush();
9796
this.lastError = null;
9897
this.lastFlushOffset = 0;
99-
this.bufferSize = bufferSize;
98+
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
10099
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
101100
this.bufferIndex = 0;
102101
this.writeOperations = new ConcurrentLinkedDeque<>();
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Class to hold extra output stream configs.
23+
*/
24+
public class AbfsOutputStreamContext extends AbfsStreamContext {
25+
26+
private int writeBufferSize;
27+
28+
private boolean enableFlush;
29+
30+
private boolean disableOutputStreamFlush;
31+
32+
public AbfsOutputStreamContext() {
33+
}
34+
35+
public AbfsOutputStreamContext withWriteBufferSize(
36+
final int writeBufferSize) {
37+
this.writeBufferSize = writeBufferSize;
38+
return this;
39+
}
40+
41+
public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
42+
this.enableFlush = enableFlush;
43+
return this;
44+
}
45+
46+
public AbfsOutputStreamContext disableOutputStreamFlush(
47+
final boolean disableOutputStreamFlush) {
48+
this.disableOutputStreamFlush = disableOutputStreamFlush;
49+
return this;
50+
}
51+
52+
public AbfsOutputStreamContext build() {
53+
// Validation of parameters to be done here.
54+
return this;
55+
}
56+
57+
public int getWriteBufferSize() {
58+
return writeBufferSize;
59+
}
60+
61+
public boolean isEnableFlush() {
62+
return enableFlush;
63+
}
64+
65+
public boolean isDisableOutputStreamFlush() {
66+
return disableOutputStreamFlush;
67+
}
68+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Base stream configuration class which is going
23+
* to store common configs among input and output streams.
24+
*/
25+
public abstract class AbfsStreamContext {
26+
}

0 commit comments

Comments
 (0)