Skip to content

Commit f324efd

Browse files
sreeb-msftsteveloughran
authored andcommitted
HADOOP-18012. ABFS: Enable config controlled ETag check for Rename idempotency (#5488)
To support recovery of network failures during rename, the abfs client fetches the etag of the source file, and when recovering from a failure, uses this tag to determine whether the rename succeeded before the failure happened. * This works for files, but not directories * It adds the overhead of a HEAD request before each rename. * The option can be disabled by setting "fs.azure.enable.rename.resilience" to false Contributed by Sree Bhattacharyya
1 parent 42ed2b9 commit f324efd

File tree

11 files changed

+622
-97
lines changed

11 files changed

+622
-97
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ public class AbfsConfiguration{
333333
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
334334
private boolean enableAbfsListIterator;
335335

336+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
337+
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
338+
private boolean renameResilience;
339+
336340
public AbfsConfiguration(final Configuration rawConfig, String accountName)
337341
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
338342
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@@ -1139,4 +1143,11 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
11391143
this.enableAbfsListIterator = enableAbfsListIterator;
11401144
}
11411145

1146+
public boolean getRenameResilience() {
1147+
return renameResilience;
1148+
}
1149+
1150+
void setRenameResilience(boolean actualResilience) {
1151+
renameResilience = actualResilience;
1152+
}
11421153
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ public void initialize(URI uri, Configuration configuration)
201201
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
202202
this.setWorkingDirectory(this.getHomeDirectory());
203203

204+
TracingContext tracingContext = new TracingContext(clientCorrelationId,
205+
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
204206
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
205-
TracingContext tracingContext = new TracingContext(clientCorrelationId,
206-
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
207207
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
208208
try {
209209
this.createFileSystem(tracingContext);
@@ -442,7 +442,7 @@ public boolean rename(final Path src, final Path dst) throws IOException {
442442
}
443443

444444
// Non-HNS account need to check dst status on driver side.
445-
if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
445+
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
446446
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
447447
}
448448

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,9 +923,11 @@ public boolean rename(final Path source,
923923

924924
do {
925925
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
926+
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
926927
final AbfsClientRenameResult abfsClientRenameResult =
927928
client.renamePath(sourceRelativePath, destinationRelativePath,
928-
continuation, tracingContext, sourceEtag, false);
929+
continuation, tracingContext, sourceEtag, false,
930+
isNamespaceEnabled);
929931

930932
AbfsRestOperation op = abfsClientRenameResult.getOp();
931933
perfInfo.registerResult(op.getResult());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ public final class ConfigurationKeys {
238238
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
239239
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
240240

241+
/** Add extra resilience to rename failures, at the expense of performance. */
242+
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
243+
241244
public static String accountProperty(String property, String account) {
242245
return property + "." + account;
243246
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public final class FileSystemConfigurations {
118118

119119
public static final int STREAM_ID_LEN = 12;
120120
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
121+
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
121122

122123
/**
123124
* Limit of queued block upload operations before writes

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 136 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
5656
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
5757
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
58+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
5859
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
5960
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
6061
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@@ -68,6 +69,7 @@
6869
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
6970
import org.apache.hadoop.util.concurrent.HadoopExecutors;
7071

72+
import static org.apache.commons.lang3.StringUtils.isEmpty;
7173
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
7274
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
7375
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
@@ -77,8 +79,8 @@
7779
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
7880
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
7981
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
80-
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
8182
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
83+
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
8284

8385
/**
8486
* AbfsClient.
@@ -106,9 +108,12 @@ public class AbfsClient implements Closeable {
106108

107109
private final ListeningScheduledExecutorService executorService;
108110

109-
/** logging the rename failure if metadata is in an incomplete state. */
110-
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
111-
new LogExactlyOnce(LOG);
111+
private boolean renameResilience;
112+
113+
/**
114+
* logging the rename failure if metadata is in an incomplete state.
115+
*/
116+
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG);
112117

113118
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
114119
final AbfsConfiguration abfsConfiguration,
@@ -123,6 +128,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
123128
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
124129
this.authType = abfsConfiguration.getAuthType(accountName);
125130
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
131+
this.renameResilience = abfsConfiguration.getRenameResilience();
126132

127133
String encryptionKey = this.abfsConfiguration
128134
.getClientProvidedEncryptionKey();
@@ -504,27 +510,55 @@ public AbfsRestOperation breakLease(final String path,
504510
* took place.
505511
* As rename recovery is only attempted if the source etag is non-empty,
506512
* in normal rename operations rename recovery will never happen.
507-
* @param source path to source file
508-
* @param destination destination of rename.
509-
* @param continuation continuation.
510-
* @param tracingContext trace context
511-
* @param sourceEtag etag of source file. may be null or empty
513+
*
514+
* @param source path to source file
515+
* @param destination destination of rename.
516+
* @param continuation continuation.
517+
* @param tracingContext trace context
518+
* @param sourceEtag etag of source file. may be null or empty
512519
* @param isMetadataIncompleteState was there a rename failure due to
513520
* incomplete metadata state?
521+
* @param isNamespaceEnabled whether namespace enabled account or not
514522
* @return AbfsClientRenameResult result of rename operation indicating the
515523
* AbfsRest operation, rename recovery and incomplete metadata state failure.
516524
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
517525
*/
518526
public AbfsClientRenameResult renamePath(
519-
final String source,
520-
final String destination,
521-
final String continuation,
522-
final TracingContext tracingContext,
523-
final String sourceEtag,
524-
boolean isMetadataIncompleteState)
527+
final String source,
528+
final String destination,
529+
final String continuation,
530+
final TracingContext tracingContext,
531+
String sourceEtag,
532+
boolean isMetadataIncompleteState,
533+
boolean isNamespaceEnabled)
525534
throws AzureBlobFileSystemException {
526535
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
527536

537+
final boolean hasEtag = !isEmpty(sourceEtag);
538+
539+
boolean shouldAttemptRecovery = renameResilience && isNamespaceEnabled;
540+
if (!hasEtag && shouldAttemptRecovery) {
541+
// in case eTag is already not supplied to the API
542+
// and rename resilience is expected and it is an HNS enabled account
543+
// fetch the source etag to be used later in recovery
544+
try {
545+
final AbfsRestOperation srcStatusOp = getPathStatus(source,
546+
false, tracingContext);
547+
if (srcStatusOp.hasResult()) {
548+
final AbfsHttpOperation result = srcStatusOp.getResult();
549+
sourceEtag = extractEtagHeader(result);
550+
// and update the directory status.
551+
boolean isDir = checkIsDir(result);
552+
shouldAttemptRecovery = !isDir;
553+
LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir);
554+
}
555+
} catch (AbfsRestOperationException e) {
556+
throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(),
557+
e.getMessage(), e);
558+
}
559+
560+
}
561+
528562
String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
529563
if (authType == AuthType.SAS) {
530564
final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
@@ -541,12 +575,7 @@ public AbfsClientRenameResult renamePath(
541575
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
542576

543577
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
544-
final AbfsRestOperation op = new AbfsRestOperation(
545-
AbfsRestOperationType.RenamePath,
546-
this,
547-
HTTP_METHOD_PUT,
548-
url,
549-
requestHeaders);
578+
final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
550579
try {
551580
incrementAbfsRenamePath();
552581
op.execute(tracingContext);
@@ -557,48 +586,74 @@ public AbfsClientRenameResult renamePath(
557586
// isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
558587
return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState);
559588
} catch (AzureBlobFileSystemException e) {
560-
// If we have no HTTP response, throw the original exception.
561-
if (!op.hasResult()) {
562-
throw e;
563-
}
564-
565-
// ref: HADOOP-18242. Rename failure occurring due to a rare case of
566-
// tracking metadata being in incomplete state.
567-
if (op.getResult().getStorageErrorCode()
568-
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
569-
&& !isMetadataIncompleteState) {
570-
// Logging once
571-
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
572-
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
589+
// If we have no HTTP response, throw the original exception.
590+
if (!op.hasResult()) {
591+
throw e;
592+
}
573593

594+
// ref: HADOOP-18242. Rename failure occurring due to a rare case of
595+
// tracking metadata being in incomplete state.
596+
if (op.getResult().getStorageErrorCode()
597+
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
598+
&& !isMetadataIncompleteState) {
599+
//Logging
600+
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
601+
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
602+
// rename recovery should be attempted in this case also
603+
shouldAttemptRecovery = true;
604+
isMetadataIncompleteState = true;
605+
String sourceEtagAfterFailure = sourceEtag;
606+
if (isEmpty(sourceEtagAfterFailure)) {
574607
// Doing a HEAD call resolves the incomplete metadata state and
575608
// then we can retry the rename operation.
576609
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
577610
tracingContext);
578-
isMetadataIncompleteState = true;
579611
// Extract the sourceEtag, using the status Op, and set it
580612
// for future rename recovery.
581613
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
582-
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
583-
renamePath(source, destination, continuation, tracingContext,
584-
sourceEtagAfterFailure, isMetadataIncompleteState);
585-
}
586-
// if we get out of the condition without a successful rename, then
587-
// it isn't metadata incomplete state issue.
588-
isMetadataIncompleteState = false;
589-
590-
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
591-
source,
592-
sourceEtag, op, destination, tracingContext);
593-
if (!etagCheckSucceeded) {
594-
// idempotency did not return different result
595-
// throw back the exception
596-
throw e;
614+
sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
597615
}
616+
renamePath(source, destination, continuation, tracingContext,
617+
sourceEtagAfterFailure, isMetadataIncompleteState, isNamespaceEnabled);
618+
}
619+
// if we get out of the condition without a successful rename, then
620+
// it isn't metadata incomplete state issue.
621+
isMetadataIncompleteState = false;
622+
623+
// setting default rename recovery success to false
624+
boolean etagCheckSucceeded = false;
625+
if (shouldAttemptRecovery) {
626+
etagCheckSucceeded = renameIdempotencyCheckOp(
627+
source,
628+
sourceEtag, op, destination, tracingContext);
629+
}
630+
if (!etagCheckSucceeded) {
631+
// idempotency did not return different result
632+
// throw back the exception
633+
throw e;
634+
}
598635
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
599636
}
600637
}
601638

639+
private boolean checkIsDir(AbfsHttpOperation result) {
640+
String resourceType = result.getResponseHeader(
641+
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
642+
return resourceType != null
643+
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
644+
}
645+
646+
@VisibleForTesting
647+
AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) {
648+
AbfsRestOperation op = new AbfsRestOperation(
649+
AbfsRestOperationType.RenamePath,
650+
this,
651+
HTTP_METHOD_PUT,
652+
url,
653+
requestHeaders);
654+
return op;
655+
}
656+
602657
private void incrementAbfsRenamePath() {
603658
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
604659
}
@@ -628,28 +683,44 @@ public boolean renameIdempotencyCheckOp(
628683
TracingContext tracingContext) {
629684
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
630685

631-
if ((op.isARetriedRequest())
632-
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
633-
&& isNotEmpty(sourceEtag)) {
634-
635-
// Server has returned HTTP 404, which means rename source no longer
636-
// exists. Check on destination status and if its etag matches
637-
// that of the source, consider it to be a success.
638-
LOG.debug("rename {} to {} failed, checking etag of destination",
639-
source, destination);
686+
// removing isDir from debug logs as it can be misleading
687+
LOG.debug("rename({}, {}) failure {}; retry={} etag {}",
688+
source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), sourceEtag);
689+
if (!(op.isARetriedRequest()
690+
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) {
691+
// only attempt recovery if the failure was a 404 on a retried rename request.
692+
return false;
693+
}
640694

695+
if (isNotEmpty(sourceEtag)) {
696+
// Server has returned HTTP 404, we have an etag, so see
697+
// if the rename has actually taken place,
698+
LOG.info("rename {} to {} failed, checking etag of destination",
699+
source, destination);
641700
try {
642-
final AbfsRestOperation destStatusOp = getPathStatus(destination,
643-
false, tracingContext);
701+
final AbfsRestOperation destStatusOp = getPathStatus(destination, false, tracingContext);
644702
final AbfsHttpOperation result = destStatusOp.getResult();
645703

646-
return result.getStatusCode() == HttpURLConnection.HTTP_OK
647-
&& sourceEtag.equals(extractEtagHeader(result));
648-
} catch (AzureBlobFileSystemException ignored) {
704+
final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK
705+
&& sourceEtag.equals(extractEtagHeader(result));
706+
LOG.info("File rename has taken place: recovery {}",
707+
recovered ? "succeeded" : "failed");
708+
return recovered;
709+
710+
} catch (AzureBlobFileSystemException ex) {
649711
// GetFileStatus on the destination failed, the rename did not take place
712+
// or some other failure. log and swallow.
713+
LOG.debug("Failed to get status of path {}", destination, ex);
650714
}
715+
} else {
716+
LOG.debug("No source etag; unable to probe for the operation's success");
651717
}
652-
return false;
718+
return false;
719+
}
720+
721+
@VisibleForTesting
722+
boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) {
723+
return sourceEtag.equals(extractEtagHeader(result));
653724
}
654725

655726
public AbfsRestOperation append(final String path, final byte[] buffer,

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,16 @@ public boolean isRenameRecovered() {
5858
public boolean isIncompleteMetadataState() {
5959
return isIncompleteMetadataState;
6060
}
61+
62+
@Override
63+
public String toString() {
64+
return "AbfsClientRenameResult{"
65+
+ "op="
66+
+ op
67+
+ ", renameRecovered="
68+
+ renameRecovered
69+
+ ", isIncompleteMetadataState="
70+
+ isIncompleteMetadataState
71+
+ '}';
72+
}
6173
}

0 commit comments

Comments
 (0)