Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
private int maxBackoffInterval;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED,
DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED)
private boolean staticRetryForConnectionTimeoutEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_INTERVAL,
DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL)
private int staticRetryInterval;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
private int backoffInterval;
Expand All @@ -166,6 +174,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
private int customTokenFetchRetryCount;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT,
DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
private int httpConnectionTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT,
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
private int httpReadTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
Expand Down Expand Up @@ -343,8 +359,11 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;

private String clientProvidedEncryptionKey;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
Expand Down Expand Up @@ -669,6 +688,14 @@ public int getMaxBackoffIntervalMilliseconds() {
return this.maxBackoffInterval;
}

public boolean getStaticRetryForConnectionTimeoutEnabled() {
return staticRetryForConnectionTimeoutEnabled;
}

public int getStaticRetryInterval() {
return staticRetryInterval;
}

public int getBackoffIntervalMilliseconds() {
return this.backoffInterval;
}
Expand All @@ -681,6 +708,14 @@ public int getCustomTokenFetchRetryCount() {
return this.customTokenFetchRetryCount;
}

public int getHttpConnectionTimeout() {
return this.httpConnectionTimeout;
}

public int getHttpReadTimeout() {
return this.httpReadTimeout;
}

public long getAzureBlockSize() {
return this.azureBlockSize;
}
Expand Down Expand Up @@ -1208,4 +1243,13 @@ public boolean getRenameResilience() {
void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
}

public boolean getIsChecksumValidationEnabled() {
return isChecksumValidationEnabled;
}

@VisibleForTesting
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
Expand Down Expand Up @@ -1781,6 +1782,8 @@ private AbfsClientContext populateAbfsClientContext() {
return new AbfsClientContextBuilder()
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfiguration))
.withStaticRetryPolicy(
new StaticRetryPolicy(abfsConfiguration))
.withAbfsCounters(abfsCounters)
.withAbfsPerfTracker(abfsPerfTracker)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public final class AbfsHttpConstants {
public static final String FORWARD_SLASH_ENCODE = "%2F";
public static final String AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER = "@";
public static final String UTF_8 = "utf-8";
public static final String MD5 = "MD5";
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,23 @@ public final class ConfigurationKeys {
// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = "fs.azure.static.retry.for.connection.timeout.enabled";
public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval";
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";

/**
* Config to set HTTP Connection Timeout Value for Rest Operations.
* Value: {@value}.
*/
public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout";
/**
* Config to set HTTP Read Timeout Value for Rest Operations.
* Value: {@value}.
*/
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";

// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
Expand Down Expand Up @@ -262,6 +275,9 @@ public final class ConfigurationKeys {
/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

public static String accountProperty(String property, String account) {
return property + "." + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;
private static final int SIXTY_SECONDS = 60_000;

// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000; // 30s
public static final boolean DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true;
public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
public static final int DEFAULT_BACKOFF_INTERVAL = 3_000; // 3s
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;

/**
* Default value of connection timeout to be used while setting up HTTP Connection.
* Value: {@value}.
*/
public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s
/**
* Default value of read timeout to be used while setting up HTTP Connection.
* Value: {@value}.
*/
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs

// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
Expand Down Expand Up @@ -120,6 +133,7 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";
public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;

/**
* Exception to be thrown if any Runtime Exception occurs.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AbfsDriverException extends AbfsRestOperationException {

private static final String ERROR_MESSAGE = "Runtime Exception Occurred In ABFS Driver";

public AbfsDriverException(final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: ERROR_MESSAGE,
innerException);
}

public AbfsDriverException(final Exception innerException, final String activityId) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString() + ", rId: " + activityId
: ERROR_MESSAGE + ", rId: " + activityId,
null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;

/**
* Exception to wrap invalid checksum verification on client side.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AbfsInvalidChecksumException extends AbfsRestOperationException {

private static final String ERROR_MESSAGE = "Checksum Validation Failed, MD5 Mismatch Error";

public AbfsInvalidChecksumException(final AbfsRestOperationException abfsRestOperationException) {
super(
abfsRestOperationException != null
? abfsRestOperationException.getStatusCode()
: AzureServiceErrorCode.UNKNOWN.getStatusCode(),
abfsRestOperationException != null
? abfsRestOperationException.getErrorCode().getErrorCode()
: AzureServiceErrorCode.UNKNOWN.getErrorCode(),
abfsRestOperationException != null
? abfsRestOperationException.toString()
: ERROR_MESSAGE,
abfsRestOperationException);
}

public AbfsInvalidChecksumException(final String activityId) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
ERROR_MESSAGE + ", rId: " + activityId,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public enum AzureServiceErrorCode {
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),
MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST,
"The MD5 value specified in the request did not match with the MD5 value calculated by the server."),
UNKNOWN(null, -1, null);

private final String errorCode;
Expand Down
Loading