Skip to content

Commit 1dcd6ce

Browse files
committed
Namespace check + IOStats counters test
1 parent 4426ad7 commit 1dcd6ce

File tree

2 files changed

+107
-27
lines changed

2 files changed

+107
-27
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
7979
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
8080
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
81+
import static org.eclipse.jetty.util.StringUtil.isEmpty;
8182

8283
/**
8384
* AbfsClient.
@@ -105,6 +106,8 @@ public class AbfsClient implements Closeable {
105106

106107
private final ListeningScheduledExecutorService executorService;
107108

109+
private final boolean renameResilience;
110+
108111
/** logging the rename failure if metadata is in an incomplete state. */
109112
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
110113
new LogExactlyOnce(LOG);
@@ -122,6 +125,9 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
122125
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
123126
this.authType = abfsConfiguration.getAuthType(accountName);
124127
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
128+
this.renameResilience = abfsConfiguration.getRenameResilience();
129+
130+
125131

126132
String encryptionKey = this.abfsConfiguration
127133
.getClientProvidedEncryptionKey();
@@ -524,12 +530,16 @@ public AbfsClientRenameResult renamePath(
524530
throws AzureBlobFileSystemException {
525531
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
526532

527-
if (sourceEtag == null || sourceEtag.isEmpty()) {
533+
final boolean hasEtag = !isEmpty(sourceEtag);
534+
boolean isDir = !hasEtag;
535+
if (!hasEtag && renameResilience) {
528536
final AbfsRestOperation srcStatusOp = getPathStatus(source,
529537
false, tracingContext);
530538
final AbfsHttpOperation result = srcStatusOp.getResult();
531539

532540
sourceEtag = extractEtagHeader(result);
541+
542+
isDir = isEmpty(sourceEtag);
533543
}
534544

535545
String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
@@ -591,7 +601,7 @@ public AbfsClientRenameResult renamePath(
591601

592602
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
593603
source,
594-
sourceEtag, op, destination, tracingContext);
604+
sourceEtag, op, destination, tracingContext, isDir);
595605
if (!etagCheckSucceeded) {
596606
// idempotency did not return different result
597607
// throw back the exception
@@ -638,7 +648,8 @@ public boolean renameIdempotencyCheckOp(
638648
final String sourceEtag,
639649
final AbfsRestOperation op,
640650
final String destination,
641-
TracingContext tracingContext) {
651+
TracingContext tracingContext,
652+
final boolean isDir) {
642653
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
643654

644655
if ((op.isARetriedRequest())
@@ -651,20 +662,34 @@ && isNotEmpty(sourceEtag)) {
651662
LOG.debug("rename {} to {} failed, checking etag of destination",
652663
source, destination);
653664

665+
if (isDir) {
666+
// directory recovery is not supported.
667+
// log and fail.
668+
LOG.info("rename directory {} to {} failed; unable to recover",
669+
source, destination);
670+
return false;
671+
}
672+
654673
try {
655674
final AbfsRestOperation destStatusOp = getPathStatus(destination,
656675
false, tracingContext);
657676
final AbfsHttpOperation result = destStatusOp.getResult();
658677

659678
return result.getStatusCode() == HttpURLConnection.HTTP_OK
660-
&& sourceEtag.equals(extractEtagHeader(result));
679+
&& isSourceDestEtagEqual(sourceEtag, result);
680+
//sourceEtag.equals(extractEtagHeader(result));
661681
} catch (AzureBlobFileSystemException ignored) {
662682
// GetFileStatus on the destination failed, the rename did not take place
663683
}
664684
}
665685
return false;
666686
}
667687

688+
@VisibleForTesting
689+
boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) {
690+
return sourceEtag.equals(extractEtagHeader(result));
691+
}
692+
668693
public AbfsRestOperation append(final String path, final byte[] buffer,
669694
AppendRequestParameters reqParams, final String cachedSasToken, TracingContext tracingContext)
670695
throws AzureBlobFileSystemException {

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,32 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21-
import org.apache.hadoop.fs.Path;
22-
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
23-
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
21+
import java.io.IOException;
22+
import java.net.HttpURLConnection;
23+
import java.net.URL;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import javax.net.ssl.HttpsURLConnection;
27+
28+
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
29+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
2430
import org.assertj.core.api.Assertions;
25-
import org.junit.Assert;
2631
import org.junit.Assume;
2732
import org.junit.Test;
2833
import org.mockito.Mockito;
2934
import org.slf4j.Logger;
3035
import org.slf4j.LoggerFactory;
3136

37+
import org.apache.hadoop.fs.Path;
38+
3239
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
3340
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
3441
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
3542
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
36-
37-
import javax.net.ssl.HttpsURLConnection;
38-
39-
import java.io.IOException;
40-
import java.net.HttpURLConnection;
41-
import java.net.URL;
42-
import java.util.List;
43+
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
4344

4445
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
4546
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
46-
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
4747
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
4848
import static org.mockito.ArgumentMatchers.nullable;
4949
import static org.mockito.Mockito.doReturn;
@@ -203,45 +203,100 @@ AbfsClient getMockAbfsClient() throws IOException {
203203
@Test
204204
public void testRenameRecoverySrcDestEtagSame() throws IOException {
205205
AzureBlobFileSystem fs = getFileSystem();
206+
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
206207
TracingContext testTracingContext = getTestTracingContext(fs, false);
207208

208209
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
209210

210211
AbfsClient mockClient = getMockAbfsClient();
211212

213+
// simulating eTag check returns true
214+
// source and destination eTag equal
215+
//
216+
Mockito.doReturn(true).when(mockClient).isSourceDestEtagEqual(
217+
nullable(String.class), nullable(AbfsHttpOperation.class)
218+
);
212219

213220
String path1 = "/dummyFile1";
214-
String path2 = "/dummyFile2";
215-
221+
String path2 = "dummyFile2";
216222
fs.create(new Path(path1));
217223
fs.create(new Path(path2));
218224

225+
abfsStore.setClient(mockClient);
226+
227+
// checking correct count in AbfsCounters
228+
AbfsCounters counter = mockClient.getAbfsCounters();
229+
Long connMadeBeforeRename = counter.getIOStatistics().counters().
230+
get(AbfsStatistic.CONNECTIONS_MADE.getStatName());
231+
Long renamePathAttemptsBeforeRename = counter.getIOStatistics().counters().
232+
get(AbfsStatistic.RENAME_PATH_ATTEMPTS.getStatName());
233+
219234
// 404 and retry, send sourceEtag as null
220235
// source eTag matches -> rename should pass even when execute throws exception
221-
mockClient.renamePath(path1, path1, null, testTracingContext, null, false);
236+
fs.rename(new Path(path1), new Path(path2));
237+
238+
// validating stat counters after rename
239+
Long connMadeAfterRename = counter.getIOStatistics().counters().
240+
get(AbfsStatistic.CONNECTIONS_MADE.getStatName());
241+
Long renamePathAttemptsAfterRename = counter.getIOStatistics().counters().
242+
get(AbfsStatistic.RENAME_PATH_ATTEMPTS.getStatName());
243+
244+
// 4 calls should have happened in total for rename
245+
// 1 -> original rename rest call, 2 -> first retry,
246+
// +2 for getPathStatus calls
247+
assertEquals(Long.valueOf(connMadeBeforeRename+4), connMadeAfterRename);
248+
249+
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
250+
// retries happen internally within AbfsRestOperation execute()
251+
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
252+
assertEquals(Long.valueOf(renamePathAttemptsBeforeRename+1), renamePathAttemptsAfterRename);
253+
222254
}
223255

224256
@Test
225-
public void testRenameRecoverySrcDestEtagDifferent() throws IOException {
257+
public void testRenameRecoverySrcDestEtagDifferent() throws Exception {
226258
AzureBlobFileSystem fs = getFileSystem();
259+
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
227260
TracingContext testTracingContext = getTestTracingContext(fs, false);
228261

229262
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
230263

231-
AbfsClient spyClient = getMockAbfsClient();
264+
AbfsClient mockClient = getMockAbfsClient();
232265

233266
String path1 = "/dummyFile1";
234267
String path2 = "/dummyFile2";
235268

236269
fs.create(new Path(path1));
237270
fs.create(new Path(path2));
238271

239-
// source eTag does not match -> throw exception
240-
try {
241-
spyClient.renamePath(path1, path2,null, testTracingContext, null, false);
242-
} catch (AbfsRestOperationException e) {
243-
Assert.assertEquals(SOURCE_PATH_NOT_FOUND, e.getErrorCode());
244-
}
272+
abfsStore.setClient(mockClient);
273+
274+
// checking correct count in AbfsCounters
275+
AbfsCounters counter = mockClient.getAbfsCounters();
276+
Long connMadeBeforeRename = counter.getIOStatistics().counters().
277+
get(AbfsStatistic.CONNECTIONS_MADE.getStatName());
278+
Long renamePathAttemptsBeforeRename = counter.getIOStatistics().counters().
279+
get(AbfsStatistic.RENAME_PATH_ATTEMPTS.getStatName());
280+
281+
// source eTag does not match -> rename should be a failure
282+
boolean renameResult = fs.rename(new Path(path1), new Path(path2));
283+
assertEquals(false, renameResult);
284+
285+
// validating stat counters after rename
286+
Long connMadeAfterRename = counter.getIOStatistics().counters().
287+
get(AbfsStatistic.CONNECTIONS_MADE.getStatName());
288+
Long renamePathAttemptsAfterRename = counter.getIOStatistics().counters().
289+
get(AbfsStatistic.RENAME_PATH_ATTEMPTS.getStatName());
290+
291+
// 4 calls should have happened in total for rename
292+
// 1 -> original rename rest call, 2 -> first retry,
293+
// +2 for getPathStatus calls
294+
assertEquals(Long.valueOf(connMadeBeforeRename+4), connMadeAfterRename);
295+
296+
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
297+
// retries happen internally within AbfsRestOperation execute()
298+
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
299+
assertEquals(Long.valueOf(renamePathAttemptsBeforeRename+1), renamePathAttemptsAfterRename);
245300
}
246301

247302
/**

0 commit comments

Comments
 (0)