Skip to content

Make S3 custom query parameter optional #128043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/changelog/128043.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pr: 128043
summary: Make S3 custom query parameter optional
area: Snapshot/Restore
type: breaking
issues: []
breaking:
title: Make S3 custom query parameter optional
area: Cluster and node setting
details: >-
Earlier versions of Elasticsearch would record the purpose of each S3 API
call using the `?x-purpose=` custom query parameter. This isn't believed to
be necessary outside of the ECH/ECE/ECK/... managed services, and it adds
rather a lot to the request logs, so with this change we make the feature
optional and disabled by default.
impact: >-
If you wish to reinstate the old behaviour on a S3 repository, set
`s3.client.${CLIENT_NAME}.add_purpose_custom_query_parameter` to `true`
for the relevant client.
notable: false
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
final Settings.Builder builder = Settings.builder()
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
.put(S3ClientSettings.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER.getConcreteSettingForNamespace("test").getKey(), "true")
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.setSecureSettings(secureSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ private Request getRegisterRequest(UnaryOperator<Settings> settingsUnaryOperator
.put("canned_acl", "private")
.put("storage_class", "standard")
.put("disable_chunked_encoding", randomBoolean())
.put(
randomFrom(
Settings.EMPTY,
Settings.builder().put("add_purpose_custom_query_parameter", randomBoolean()).build()
)
)
.build()
)
)
Expand Down Expand Up @@ -183,8 +189,10 @@ private void testNonexistentClient(Boolean readonly) throws Exception {
final var responseObjectPath = ObjectPath.createFromResponse(responseException.getResponse());
assertThat(responseObjectPath.evaluate("error.type"), equalTo("repository_verification_exception"));
assertThat(responseObjectPath.evaluate("error.reason"), containsString("is not accessible on master node"));
assertThat(responseObjectPath.evaluate("error.caused_by.type"), equalTo("illegal_argument_exception"));
assertThat(responseObjectPath.evaluate("error.caused_by.reason"), containsString("Unknown s3 client name"));
assertThat(responseObjectPath.evaluate("error.caused_by.type"), equalTo("repository_exception"));
assertThat(responseObjectPath.evaluate("error.caused_by.reason"), containsString("cannot create blob store"));
assertThat(responseObjectPath.evaluate("error.caused_by.caused_by.type"), equalTo("illegal_argument_exception"));
assertThat(responseObjectPath.evaluate("error.caused_by.caused_by.reason"), containsString("Unknown s3 client name"));
}

public void testNonexistentSnapshot() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,7 @@ ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCounti
.prefix(keyPath)
.maxUploads(maxUploads)
// TODO adjust to use S3BlobStore.configureRequestForMetrics, adding metrics collection
.overrideConfiguration(
b -> b.putRawQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey())
)
.overrideConfiguration(b -> blobStore.addPurposeQueryParameter(OperationPurpose.SNAPSHOT_DATA, b))
.build();
final var multipartUploadListing = clientReference.client().listMultipartUploads(listMultipartUploadsRequest);
final var multipartUploads = multipartUploadListing.uploads();
Expand Down Expand Up @@ -1184,12 +1182,7 @@ ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCounti
.key(u.key())
.uploadId(u.uploadId())
// TODO adjust to use S3BlobStore.configureRequestForMetrics, adding metrics collection
.overrideConfiguration(
b -> b.putRawQueryParameter(
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
OperationPurpose.SNAPSHOT_DATA.getKey()
)
)
.overrideConfiguration(b -> blobStore.addPurposeQueryParameter(OperationPurpose.SNAPSHOT_DATA, b))
.build()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.repositories.s3;

import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.core.retry.RetryUtils;
Expand Down Expand Up @@ -102,6 +103,8 @@ class S3BlobStore implements BlobStore {

private final TimeValue getRegisterRetryDelay;

private final boolean addPurposeCustomQueryParameter;

S3BlobStore(
S3Service service,
String bucket,
Expand Down Expand Up @@ -131,6 +134,7 @@ class S3BlobStore implements BlobStore {
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
this.addPurposeCustomQueryParameter = service.settings(repositoryMetadata).addPurposeCustomQueryParameter;
}

MetricPublisher getMetricPublisher(Operation operation, OperationPurpose purpose) {
Expand Down Expand Up @@ -600,9 +604,17 @@ static void configureRequestForMetrics(
Operation operation,
OperationPurpose purpose
) {
request.overrideConfiguration(
builder -> builder.metricPublishers(List.of(blobStore.getMetricPublisher(operation, purpose)))
.putRawQueryParameter(CUSTOM_QUERY_PARAMETER_PURPOSE, purpose.getKey())
);
request.overrideConfiguration(builder -> {
builder.metricPublishers(List.of(blobStore.getMetricPublisher(operation, purpose)));
blobStore.addPurposeQueryParameter(purpose, builder);
});
}

public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrideConfiguration.Builder builder) {
if (addPurposeCustomQueryParameter || purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including custom query parameter support, so is always added
builder.putRawQueryParameter(CUSTOM_QUERY_PARAMETER_PURPOSE, purpose.getKey());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ final class S3ClientSettings {
key -> Setting.simpleString(key, Property.NodeScope, Property.Deprecated)
);

/** Whether to include the {@code x-purpose} custom query parameter in all requests. */
static final Setting.AffixSetting<Boolean> ADD_PURPOSE_CUSTOM_QUERY_PARAMETER = Setting.affixKeySetting(
PREFIX,
"add_purpose_custom_query_parameter",
key -> Setting.boolSetting(key, false, Property.NodeScope)
);

/** Credentials to authenticate with s3. */
final AwsCredentials credentials;

Expand Down Expand Up @@ -220,6 +227,9 @@ final class S3ClientSettings {
/** Whether chunked encoding should be disabled or not. */
final boolean disableChunkedEncoding;

/** Whether to add the {@code x-purpose} custom query parameter to all requests. */
final boolean addPurposeCustomQueryParameter;

/** Region to use for signing requests or empty string to use default. */
final String region;

Expand All @@ -237,6 +247,7 @@ private S3ClientSettings(
int maxRetries,
boolean pathStyleAccess,
boolean disableChunkedEncoding,
boolean addPurposeCustomQueryParameter,
String region
) {
this.credentials = credentials;
Expand All @@ -252,6 +263,7 @@ private S3ClientSettings(
this.maxRetries = maxRetries;
this.pathStyleAccess = pathStyleAccess;
this.disableChunkedEncoding = disableChunkedEncoding;
this.addPurposeCustomQueryParameter = addPurposeCustomQueryParameter;
this.region = region;
}

Expand Down Expand Up @@ -284,6 +296,11 @@ S3ClientSettings refine(Settings repositorySettings) {
normalizedSettings,
disableChunkedEncoding
);
final boolean newAddPurposeCustomQueryParameter = getRepoSettingOrDefault(
ADD_PURPOSE_CUSTOM_QUERY_PARAMETER,
normalizedSettings,
addPurposeCustomQueryParameter
);
final AwsCredentials newCredentials;
if (checkDeprecatedCredentials(repositorySettings)) {
newCredentials = loadDeprecatedCredentials(repositorySettings);
Expand All @@ -302,6 +319,7 @@ S3ClientSettings refine(Settings repositorySettings) {
&& Objects.equals(credentials, newCredentials)
&& newPathStyleAccess == pathStyleAccess
&& newDisableChunkedEncoding == disableChunkedEncoding
&& newAddPurposeCustomQueryParameter == addPurposeCustomQueryParameter
&& Objects.equals(region, newRegion)) {
return this;
}
Expand All @@ -319,6 +337,7 @@ S3ClientSettings refine(Settings repositorySettings) {
newMaxRetries,
newPathStyleAccess,
newDisableChunkedEncoding,
newAddPurposeCustomQueryParameter,
newRegion
);
}
Expand Down Expand Up @@ -426,6 +445,7 @@ static S3ClientSettings getClientSettings(final Settings settings, final String
getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS),
getConfigValue(settings, clientName, DISABLE_CHUNKED_ENCODING),
getConfigValue(settings, clientName, ADD_PURPOSE_CUSTOM_QUERY_PARAMETER),
getConfigValue(settings, clientName, REGION)
);
}
Expand All @@ -452,6 +472,7 @@ public boolean equals(final Object o) {
&& Objects.equals(proxyUsername, that.proxyUsername)
&& Objects.equals(proxyPassword, that.proxyPassword)
&& Objects.equals(disableChunkedEncoding, that.disableChunkedEncoding)
&& Objects.equals(addPurposeCustomQueryParameter, that.addPurposeCustomQueryParameter)
&& Objects.equals(region, that.region);
}

Expand All @@ -470,6 +491,7 @@ public int hashCode() {
maxRetries,
maxConnections,
disableChunkedEncoding,
addPurposeCustomQueryParameter,
region
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public List<Setting<?>> getSettings() {
S3ClientSettings.UNUSED_USE_THROTTLE_RETRIES_SETTING,
S3ClientSettings.USE_PATH_STYLE_ACCESS,
S3ClientSettings.UNUSED_SIGNER_OVERRIDE,
S3ClientSettings.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER,
S3ClientSettings.REGION,
S3Service.REPOSITORY_S3_CAS_TTL_SETTING,
S3Service.REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.s3;

import fixture.s3.S3HttpFixture;
import fixture.s3.S3HttpHandler;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.hamcrest.Matcher;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;

public class AddPurposeCustomQueryParameterTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.getPlugins(), S3RepositoryPlugin.class);
}

@Override
protected Settings nodeSettings() {
final var secureSettings = new MockSecureSettings();
for (final var clientName : List.of("default", "with_purpose", "without_purpose")) {
secureSettings.setString(
S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace(clientName).getKey(),
randomIdentifier()
);
secureSettings.setString(
S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace(clientName).getKey(),
randomSecretKey()
);
}

return Settings.builder()
.put(super.nodeSettings())
.put(S3ClientSettings.REGION.getConcreteSettingForNamespace("default").getKey(), randomIdentifier())
.put(S3ClientSettings.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER.getConcreteSettingForNamespace("with_purpose").getKey(), "true")
.put(S3ClientSettings.ADD_PURPOSE_CUSTOM_QUERY_PARAMETER.getConcreteSettingForNamespace("without_purpose").getKey(), "false")
.setSecureSettings(secureSettings)
.build();
}

private static final Matcher<Iterable<? super String>> HAS_CUSTOM_QUERY_PARAMETER = hasItem(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE);
private static final Matcher<Iterable<? super String>> NO_CUSTOM_QUERY_PARAMETER = not(HAS_CUSTOM_QUERY_PARAMETER);

public void testCustomQueryParameterConfiguration() throws Throwable {
final var indexName = randomIdentifier();
createIndex(indexName);
prepareIndex(indexName).setSource("foo", "bar").get();

final var bucket = randomIdentifier();
final var basePath = randomIdentifier();

runCustomQueryParameterTest(bucket, basePath, null, Settings.EMPTY, NO_CUSTOM_QUERY_PARAMETER);
runCustomQueryParameterTest(bucket, basePath, "default", Settings.EMPTY, NO_CUSTOM_QUERY_PARAMETER);
runCustomQueryParameterTest(bucket, basePath, "without_purpose", Settings.EMPTY, NO_CUSTOM_QUERY_PARAMETER);
runCustomQueryParameterTest(bucket, basePath, "with_purpose", Settings.EMPTY, HAS_CUSTOM_QUERY_PARAMETER);

final var falseRepositorySetting = Settings.builder().put("add_purpose_custom_query_parameter", false).build();
final var trueRepositorySetting = Settings.builder().put("add_purpose_custom_query_parameter", true).build();
for (final var clientName : new String[] { null, "default", "with_purpose", "without_purpose" }) {
// client name doesn't matter if repository setting specified
runCustomQueryParameterTest(bucket, basePath, clientName, falseRepositorySetting, NO_CUSTOM_QUERY_PARAMETER);
runCustomQueryParameterTest(bucket, basePath, clientName, trueRepositorySetting, HAS_CUSTOM_QUERY_PARAMETER);
}
}

private void runCustomQueryParameterTest(
String bucket,
String basePath,
String clientName,
Settings extraRepositorySettings,
Matcher<Iterable<? super String>> queryParamMatcher
) throws Throwable {
final var httpFixture = new S3HttpFixture(true, bucket, basePath, (key, token) -> true) {

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
class AssertingHandler extends S3HttpHandler {
AssertingHandler() {
super(bucket, basePath);
}

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
assertThat(parseRequest(exchange).queryParameters().keySet(), queryParamMatcher);
super.handle(exchange);
} catch (Error e) {
// HttpServer catches Throwable, so we must throw errors on another thread
ExceptionsHelper.maybeDieOnAnotherThread(e);
throw e;
}
}
}

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
@Override
protected HttpHandler createHandler() {
return new AssertingHandler();
}
Comment on lines +128 to +130
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make AssertingHandler delegate to the handler returned by super.createHandler() so that we don't miss the fix from #102976?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure see 6b91e9b. It doesn't really matter here, the test fails either way.

};
httpFixture.apply(new Statement() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to stop the fixture explicitly especially when the test fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's handled within apply().

@Override
public void evaluate() {
final var repoName = randomIdentifier();
assertAcked(
client().execute(
TransportPutRepositoryAction.TYPE,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(S3Repository.TYPE)
.settings(
Settings.builder()
.put("bucket", bucket)
.put("base_path", basePath)
.put("endpoint", httpFixture.getAddress())
.put(clientName == null ? Settings.EMPTY : Settings.builder().put("client", clientName).build())
.put(extraRepositorySettings)
)
)
);

assertEquals(
SnapshotState.SUCCESS,
client().execute(
TransportCreateSnapshotAction.TYPE,
new CreateSnapshotRequest(TEST_REQUEST_TIMEOUT, repoName, randomIdentifier()).waitForCompletion(true)
).actionGet(SAFE_AWAIT_TIMEOUT).getSnapshotInfo().state()
);
}
}, Description.EMPTY).evaluate();
}

}
Loading
Loading