Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.connection.ConnectionProperty.Context;
import com.google.cloud.spanner.connection.ConnectionState.Type;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
Expand Down Expand Up @@ -302,9 +303,17 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
Preconditions.checkNotNull(options);
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
StatementExecutorType statementExecutorType;
if (options.getStatementExecutorType() != null) {
statementExecutorType = options.getStatementExecutorType();
} else {
statementExecutorType =
options.isUseVirtualThreads()
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.DIRECT_EXECUTOR;
}
this.statementExecutor =
new StatementExecutor(
options.isUseVirtualThreads(), options.getStatementExecutionInterceptors());
new StatementExecutor(statementExecutorType, options.getStatementExecutionInterceptors());
this.spannerPool = SpannerPool.INSTANCE;
this.options = options;
this.spanner = spannerPool.getSpanner(options, this);
Expand Down Expand Up @@ -348,7 +357,11 @@ && getDialect() == Dialect.POSTGRESQL
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
this.statementExecutor =
new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList());
new StatementExecutor(
options.isUseVirtualThreads()
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.DIRECT_EXECUTOR,
Collections.emptyList());
this.spannerPool = Preconditions.checkNotNull(spannerPool);
this.options = Preconditions.checkNotNull(options);
this.spanner = spannerPool.getSpanner(options, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -614,6 +615,7 @@ public static class Builder {
new HashMap<>();
private String uri;
private Credentials credentials;
private StatementExecutorType statementExecutorType;
private SessionPoolOptions sessionPoolOptions;
private List<StatementExecutionInterceptor> statementExecutionInterceptors =
Collections.emptyList();
Expand Down Expand Up @@ -777,6 +779,11 @@ Builder setCredentials(Credentials credentials) {
return this;
}

Builder setStatementExecutorType(StatementExecutorType statementExecutorType) {
this.statementExecutorType = statementExecutorType;
return this;
}

public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
return this;
Expand Down Expand Up @@ -814,6 +821,7 @@ public static Builder newBuilder() {
private final String instanceId;
private final String databaseName;
private final Credentials credentials;
private final StatementExecutorType statementExecutorType;
private final SessionPoolOptions sessionPoolOptions;

private final OpenTelemetry openTelemetry;
Expand All @@ -834,6 +842,7 @@ private ConnectionOptions(Builder builder) {
ConnectionPropertyValue<Boolean> value = cast(connectionPropertyValues.get(LENIENT.getKey()));
this.warnings = checkValidProperties(value != null && value.getValue(), uri);
this.fixedCredentials = builder.credentials;
this.statementExecutorType = builder.statementExecutorType;

this.openTelemetry = builder.openTelemetry;
this.statementExecutionInterceptors =
Expand Down Expand Up @@ -1105,6 +1114,10 @@ CredentialsProvider getCredentialsProvider() {
return getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER);
}

StatementExecutorType getStatementExecutorType() {
return this.statementExecutorType;
}

/** The {@link SessionPoolOptions} of this {@link ConnectionOptions}. */
public SessionPoolOptions getSessionPoolOptions() {
return sessionPoolOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ java.time.Duration asDuration() {
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", false);

/** Creates an {@link ExecutorService} for a {@link StatementExecutor}. */
private static ListeningExecutorService createExecutorService(boolean useVirtualThreads) {
private static ListeningExecutorService createExecutorService(StatementExecutorType type) {
if (type == StatementExecutorType.DIRECT_EXECUTOR) {
return MoreExecutors.newDirectExecutorService();
}
return MoreExecutors.listeningDecorator(
Context.taskWrapping(
new ThreadPoolExecutor(
Expand All @@ -155,7 +158,7 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
useVirtualThreads
type == StatementExecutorType.VIRTUAL_THREAD
? DEFAULT_VIRTUAL_THREAD_FACTORY
: DEFAULT_DAEMON_THREAD_FACTORY)));
}
Expand All @@ -168,13 +171,23 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
*/
private final List<StatementExecutionInterceptor> interceptors;

enum StatementExecutorType {
PLATFORM_THREAD,
VIRTUAL_THREAD,
DIRECT_EXECUTOR,
}

@VisibleForTesting
StatementExecutor() {
this(DEFAULT_USE_VIRTUAL_THREADS, Collections.emptyList());
this(
DEFAULT_USE_VIRTUAL_THREADS
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.PLATFORM_THREAD,
Collections.emptyList());
}

StatementExecutor(boolean useVirtualThreads, List<StatementExecutionInterceptor> interceptors) {
this.executor = createExecutorService(useVirtualThreads);
StatementExecutor(StatementExecutorType type, List<StatementExecutionInterceptor> interceptors) {
this.executor = createExecutorService(type);
this.interceptors = Collections.unmodifiableList(interceptors);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ ITConnection createConnection(
ConnectionOptions.newBuilder()
.setUri(getBaseUrl() + additionalUrlOptions)
.setStatementExecutionInterceptors(interceptors);
configureConnectionOptions(builder);
ConnectionOptions options = builder.build();
ITConnection connection = createITConnection(options);
for (TransactionRetryListener listener : transactionRetryListeners) {
Expand All @@ -291,6 +292,11 @@ ITConnection createConnection(
return connection;
}

protected ConnectionOptions.Builder configureConnectionOptions(
ConnectionOptions.Builder builder) {
return builder;
}

protected String getBaseUrl() {
return String.format(
"cloudspanner://localhost:%d/projects/proj/instances/inst/databases/db?usePlainText=true;autocommit=false;retryAbortsInternally=true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -127,6 +129,11 @@ ITConnection createConnection(TransactionRetryListener listener) {
return connection;
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@Test
public void testSingleQueryAborted() {
RetryCounter counter = new RetryCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.SpannerApiFutures;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
Expand Down Expand Up @@ -86,6 +88,11 @@ public void setup() {
}
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@After
public void reset() {
mockSpanner.removeAllExecutionTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.collect.ImmutableList;
import com.google.spanner.v1.BatchCreateSessionsRequest;
import com.google.spanner.v1.CommitRequest;
Expand Down Expand Up @@ -417,6 +419,11 @@ protected String getBaseUrl() {
return super.getBaseUrl() + ";maxSessions=1";
}

@Override
protected Builder configureConnectionOptions(Builder builder) {
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
}

@Test
public void testMaxSessions()
throws InterruptedException, TimeoutException, ExecutionException {
Expand Down
Loading
Loading