Skip to content

Commit 968b9a6

Browse files
Merge pull request #15 from ABFSDriver/inProgressPurgeFix
Backport HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed apache#5176
2 parents 912c8e0 + d6d6ffe commit 968b9a6

File tree

15 files changed

+286
-35
lines changed

15 files changed

+286
-35
lines changed

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,6 +2319,12 @@
23192319
<description>The AbstractFileSystem for gs: uris.</description>
23202320
</property>
23212321

2322+
<property>
2323+
<name>fs.azure.enable.readahead</name>
2324+
<value>true</value>
2325+
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
2326+
</property>
2327+
23222328
<property>
23232329
<name>io.seqfile.compress.blocksize</name>
23242330
<value>1000000</value>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@ public class AbfsConfiguration{
299299
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
300300
private boolean trackLatency;
301301

302+
@BooleanConfigurationValidatorAnnotation(
303+
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
304+
DefaultValue = DEFAULT_ENABLE_READAHEAD)
305+
private boolean enabledReadAhead;
306+
302307
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
303308
MinValue = 0,
304309
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -916,6 +921,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
916921
}
917922
}
918923

924+
public boolean isReadAheadEnabled() {
925+
return this.enabledReadAhead;
926+
}
927+
928+
@VisibleForTesting
929+
void setReadAheadEnabled(final boolean enabledReadAhead) {
930+
this.enabledReadAhead = enabledReadAhead;
931+
}
932+
919933
public int getReadAheadRange() {
920934
return this.readAheadRange;
921935
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
112112
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
113113
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
114+
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
114115
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
115116
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
116117
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -245,6 +246,7 @@ public String toString() {
245246
sb.append("uri=").append(uri);
246247
sb.append(", user='").append(abfsStore.getUser()).append('\'');
247248
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
249+
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
248250
sb.append('}');
249251
return sb.toString();
250252
}
@@ -1552,6 +1554,11 @@ public boolean hasPathCapability(final Path path, final String capability)
15521554
new TracingContext(clientCorrelationId, fileSystemId,
15531555
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
15541556
listener));
1557+
1558+
// probe for presence of the HADOOP-18546 readahead fix.
1559+
case CAPABILITY_SAFE_READAHEAD:
1560+
return true;
1561+
15551562
default:
15561563
return super.hasPathCapability(p, capability);
15571564
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
859859
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
860860
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
861861
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
862+
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
862863
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
863864
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
864865
.withReadAheadRange(abfsConfiguration.getReadAheadRange())

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ public final class ConfigurationKeys {
186186
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
187187
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
188188
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
189+
190+
/**
191+
* Enable or disable readahead buffer in AbfsInputStream.
192+
* Value: {@value}.
193+
*/
194+
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
195+
189196
/** Setting this true will make the driver use it's own RemoteIterator implementation */
190197
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
191198
/** Server side encryption key */

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
106106
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
107107
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
108108

109+
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
109110
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
110111
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
111112

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.constants;
20+
21+
import org.apache.hadoop.classification.InterfaceAudience;
22+
23+
/**
24+
* Constants which are used internally and which don't fit into the other
25+
* classes.
26+
* For use within the {@code hadoop-azure} module only.
27+
*/
28+
@InterfaceAudience.Private
29+
public final class InternalConstants {
30+
31+
private InternalConstants() {
32+
}
33+
34+
/**
35+
* Does this version of the store have safe readahead?
36+
* Possible combinations of this and the probe
37+
* {@code "fs.capability.etags.available"}.
38+
* <ol>
39+
* <li>{@value}: store is safe</li>
40+
* <li>!etags: store is safe</li>
41+
* <li>etags && !{@value}: store is <i>UNSAFE</i></li>
42+
* </ol>
43+
*/
44+
public static final String CAPABILITY_SAFE_READAHEAD =
45+
"fs.azure.capability.readahead.safe";
46+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
5252
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
53+
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
5354
import static org.apache.hadoop.util.StringUtils.toLowerCase;
5455

5556
/**
@@ -137,7 +138,7 @@ public AbfsInputStream(
137138
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
138139
this.eTag = eTag;
139140
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
140-
this.readAheadEnabled = true;
141+
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
141142
this.alwaysReadBufferSize
142143
= abfsInputStreamContext.shouldReadBufferSizeAlways();
143144
this.bufferedPreadDisabled = abfsInputStreamContext
@@ -745,6 +746,11 @@ byte[] getBuffer() {
745746
return buffer;
746747
}
747748

749+
@VisibleForTesting
750+
public boolean isReadAheadEnabled() {
751+
return readAheadEnabled;
752+
}
753+
748754
@VisibleForTesting
749755
public int getReadAheadRange() {
750756
return readAheadRange;
@@ -823,11 +829,12 @@ public IOStatistics getIOStatistics() {
823829
@Override
824830
public String toString() {
825831
final StringBuilder sb = new StringBuilder(super.toString());
832+
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
833+
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
826834
if (streamStatistics != null) {
827-
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
828-
sb.append(streamStatistics.toString());
829-
sb.append("}");
835+
sb.append(", ").append(streamStatistics);
830836
}
837+
sb.append("}");
831838
return sb.toString();
832839
}
833840

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
3535

3636
private boolean tolerateOobAppends;
3737

38+
private boolean isReadAheadEnabled = true;
39+
3840
private boolean alwaysReadBufferSize;
3941

4042
private int readAheadBlockSize;
@@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
7274
return this;
7375
}
7476

77+
public AbfsInputStreamContext isReadAheadEnabled(
78+
final boolean isReadAheadEnabled) {
79+
this.isReadAheadEnabled = isReadAheadEnabled;
80+
return this;
81+
}
82+
7583
public AbfsInputStreamContext withReadAheadRange(
7684
final int readAheadRange) {
7785
this.readAheadRange = readAheadRange;
@@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() {
141149
return tolerateOobAppends;
142150
}
143151

152+
public boolean isReadAheadEnabled() {
153+
return isReadAheadEnabled;
154+
}
155+
144156
public int getReadAheadRange() {
145157
return readAheadRange;
146158
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private void init() {
101101

102102
// hide instance constructor
103103
private ReadBufferManager() {
104+
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
104105
}
105106

106107

@@ -544,7 +545,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
544545
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
545546
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
546547
purgeList(stream, completedReadList);
547-
purgeList(stream, inProgressList);
548548
}
549549

550550
/**
@@ -642,4 +642,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
642642
freeList.clear();
643643
completedReadList.add(buf);
644644
}
645+
646+
@VisibleForTesting
647+
int getNumBuffers() {
648+
return NUM_BUFFERS;
649+
}
645650
}

0 commit comments

Comments
 (0)