Skip to content

CSOT refactoring #1781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
367 changes: 198 additions & 169 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*<p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface SingleResultCallback<T> {
SingleResultCallback<Void> THEN_DO_NOTHING = (r, t) -> {};

/**
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal.async.function;

import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;

/**
* An {@linkplain AsyncCallbackFunction asynchronous callback-based function} of three parameters.
* This class is a callback-based.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*
* @param <P1> The type of the first parameter to the function.
* @param <P2> The type of the second parameter to the function.
* @param <P3> The type of the third parameter to the function.
* @param <R> See {@link AsyncCallbackFunction}
* @see AsyncCallbackFunction
*/
@FunctionalInterface
public interface AsyncCallbackTriFunction<P1, P2, P3, R> {
/**
* @param p1 The first {@code @}{@link Nullable} argument of the asynchronous function.
* @param p2 The second {@code @}{@link Nullable} argument of the asynchronous function.
* @param p3 The second {@code @}{@link Nullable} argument of the asynchronous function.
* @see AsyncCallbackFunction#apply(Object, SingleResultCallback)
*/
void apply(P1 p1, P2 p2, P3 p3, SingleResultCallback<R> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@NotThreadSafe
public final class RetryState {
public static final int RETRIES = 1;
private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;

private final LoopState loopState;
private final int attempts;
Expand All @@ -67,19 +67,16 @@ public final class RetryState {
* </p>
*
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException If {@code true}, then if a {@link MongoOperationTimeoutException} is throws then retrying stops.
* @see #attempts()
*/
public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries > 0);
if (timeoutContext.hasTimeoutMS()){
return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
}
return new RetryState(retries, null);
return new RetryState(retries, retryUntilTimeoutThrowsException);
}

public static RetryState withNonRetryableState() {
return new RetryState(0, null);
return new RetryState(0, false);
}

/**
Expand All @@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() {
* @see #attempts()
*/
public RetryState(final TimeoutContext timeoutContext) {
this(INFINITE_ATTEMPTS, timeoutContext);
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
}

/**
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException
* @see #attempts()
*/
private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) {
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
}

/**
Expand Down Expand Up @@ -400,7 +397,7 @@ public int attempt() {
* <ul>
* <li>0 if the number of retries is {@linkplain #RetryState(TimeoutContext) unlimited};</li>
* <li>1 if no retries are allowed;</li>
* <li>{@link #RetryState(int, TimeoutContext) retries} + 1 otherwise.</li>
* <li>{@link #RetryState(int, boolean) retries} + 1 otherwise.</li>
* </ul>
*
* @see #attempt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.ServerAddress;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
Expand All @@ -28,9 +29,10 @@ public interface AsyncClusterAwareReadWriteBinding extends AsyncReadWriteBinding
* Returns a connection source to the specified server
*
* @param serverAddress the server address
* @param operationContext the operation context to use
* @param callback the to be passed the connection source
*/
void getConnectionSource(ServerAddress serverAddress, SingleResultCallback<AsyncConnectionSource> callback);
void getConnectionSource(ServerAddress serverAddress, OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

@Override
AsyncClusterAwareReadWriteBinding retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.connection.Server;
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
import com.mongodb.internal.selector.ReadPreferenceWithFallbackServerSelector;
Expand All @@ -33,7 +34,6 @@
import com.mongodb.selector.ServerSelector;

import static com.mongodb.assertions.Assertions.notNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* A simple ReadWriteBinding implementation that supplies write connection sources bound to a possibly different primary each time, and a
Expand All @@ -44,24 +44,17 @@
public class AsyncClusterBinding extends AbstractReferenceCounted implements AsyncClusterAwareReadWriteBinding {
private final Cluster cluster;
private final ReadPreference readPreference;
private final ReadConcern readConcern;
private final OperationContext operationContext;

/**
* Creates an instance.
*
* @param cluster a non-null Cluster which will be used to select a server to bind to
* @param readPreference a non-null ReadPreference for read operations
* @param readConcern a non-null read concern
* @param operationContext the operation context
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference, final ReadConcern readConcern,
final OperationContext operationContext) {
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference) {
this.cluster = notNull("cluster", cluster);
this.readPreference = notNull("readPreference", readPreference);
this.readConcern = notNull("readConcern", readConcern);
this.operationContext = notNull("operationContext", operationContext);
}

@Override
Expand All @@ -76,21 +69,18 @@ public ReadPreference getReadPreference() {
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), callback);
public void getReadConnectionSource(final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), operationContext, callback);
}

@Override
public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference,
final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
// Assume 5.0+ for load-balanced mode
if (cluster.getSettings().getMode() == ClusterConnectionMode.LOAD_BALANCED) {
getReadConnectionSource(callback);
getReadConnectionSource(operationContext, callback);
} else {
ReadPreferenceWithFallbackServerSelector readPreferenceWithFallbackServerSelector
= new ReadPreferenceWithFallbackServerSelector(readPreference, minWireVersion, fallbackReadPreference);
Expand All @@ -106,16 +96,19 @@ public void getReadConnectionSource(final int minWireVersion, final ReadPreferen
}

@Override
public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), callback);
public void getWriteConnectionSource(final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), operationContext, callback);
}

@Override
public void getConnectionSource(final ServerAddress serverAddress, final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), callback);
public void getConnectionSource(final ServerAddress serverAddress, final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), operationContext, callback);
}

private void getAsyncClusterBindingConnectionSource(final ServerSelector serverSelector,
final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
cluster.selectServerAsync(serverSelector, operationContext, (result, t) -> {
if (t != null) {
Expand All @@ -132,12 +125,12 @@ private final class AsyncClusterBindingConnectionSource extends AbstractReferenc
private final ServerDescription serverDescription;
private final ReadPreference appliedReadPreference;

private AsyncClusterBindingConnectionSource(final Server server, final ServerDescription serverDescription,
final ReadPreference appliedReadPreference) {
private AsyncClusterBindingConnectionSource(final Server server,
final ServerDescription serverDescription,
final ReadPreference appliedReadPreference) {
this.server = server;
this.serverDescription = serverDescription;
this.appliedReadPreference = appliedReadPreference;
operationContext.getTimeoutContext().minRoundTripTimeMS(NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos()));
AsyncClusterBinding.this.retain();
}

Expand All @@ -146,19 +139,20 @@ public ServerDescription getServerDescription() {
return serverDescription;
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public ReadPreference getReadPreference() {
return appliedReadPreference;
}

@Override
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
server.getConnectionAsync(operationContext, callback);
public void getConnection(final OperationContext operationContext, final SingleResultCallback<AsyncConnection> callback) {
/*
The first read in a causally consistent session MUST not send afterClusterTime to the server
(because the operationTime has not yet been determined). Therefore, we use ReadConcernAwareNoOpSessionContext to
so that we do not advance clusterTime on ClientSession in given operationContext because it might not be yet set.
*/
ReadConcern readConcern = operationContext.getSessionContext().getReadConcern();
server.getConnectionAsync(operationContext.withSessionContext(new ReadConcernAwareNoOpSessionContext(readConcern)), callback);
}

public AsyncConnectionSource retain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.OperationContext;

/**
* A source of connections to a single MongoDB server.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncConnectionSource extends BindingContext, ReferenceCounted {
public interface AsyncConnectionSource extends ReferenceCounted {

/**
* Gets the current description of this source.
Expand All @@ -38,16 +39,17 @@ public interface AsyncConnectionSource extends BindingContext, ReferenceCounted
/**
* Gets the read preference that was applied when selecting this source.
*
* @see AsyncReadBinding#getReadConnectionSource(int, ReadPreference, SingleResultCallback)
* @see AsyncReadBinding#getReadConnectionSource(int, ReadPreference, OperationContext, SingleResultCallback)
*/
ReadPreference getReadPreference();

/**
* Gets a connection from this source.
*
* @param operationContext the operation context to use
* @param callback the to be passed the connection
*/
void getConnection(SingleResultCallback<AsyncConnection> callback);
void getConnection(OperationContext operationContext, SingleResultCallback<AsyncConnection> callback);

@Override
AsyncConnectionSource retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

import com.mongodb.ReadPreference;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* An asynchronous factory of connection sources to servers that can be read from and that satisfy the specified read preference.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
public interface AsyncReadBinding extends ReferenceCounted {
/**
* The read preference that all connection sources returned by this instance will satisfy.
* @return the non-null read preference
Expand All @@ -33,9 +34,10 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {

/**
* Returns a connection source to a server that satisfies the read preference with which this instance is configured.
* @param operationContext the operation context to use
* @param callback the to be passed the connection source
*/
void getReadConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
void getReadConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

/**
* Return a connection source that satisfies the read preference with which this instance is configured, if all connected servers have
Expand All @@ -48,6 +50,7 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
* @see com.mongodb.internal.operation.AggregateToCollectionOperation
*/
void getReadConnectionSource(int minWireVersion, ReadPreference fallbackReadPreference,
OperationContext operationContext,
SingleResultCallback<AsyncConnectionSource> callback);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
package com.mongodb.internal.binding;

import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* An asynchronous factory of connection sources to servers that can be written to, e.g, a standalone, a mongos, or a replica set primary.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncWriteBinding extends BindingContext, ReferenceCounted {
public interface AsyncWriteBinding extends ReferenceCounted {

/**
* Supply a connection source to a server that can be written to
*
* @param operationContext the operation context to use
* @param callback the to be passed the connection source
*/
void getWriteConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
void getWriteConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

@Override
AsyncWriteBinding retain();
Expand Down
Loading