Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions api/src/main/java/io/grpc/ClientTransportFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* Listens on the client transport life-cycle events. These filters do not have the capability
* to modify the channels or transport life-cycle event behavior, but they can be useful hooks
* for transport observability. Multiple filters may be registered to the client.
*
* @since 1.61.0
*/
@ExperimentalApi("https://gitub.com/grpc/grpc-java/issues/10652")
public abstract class ClientTransportFilter {
/**
* Called when a transport is ready to accept traffic (when a connection has been established).
* The default implementation is a no-op.
*
* @param transportAttrs current transport attributes
*
* @return new transport attributes. Default implementation returns the passed-in attributes
* intact.
*/
public Attributes transportReady(Attributes transportAttrs) {
return transportAttrs;
}

/**
* Called when a transport completed shutting down. All resources have been released.
* All streams have either been closed or transferred off this transport.
* Default implementation is a no-op
*
* @param transportAttrs the effective transport attributes, which is what is returned by {@link
* #transportReady} of the last executed filter.
*/
public void transportTerminated(Attributes transportAttrs) {
}
}
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingChannelBuilder2.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public T intercept(ClientInterceptor... interceptors) {
return thisT();
}

@Override
public T addTransportFilter(ClientTransportFilter transportFilter) {
delegate().addTransportFilter(transportFilter);
return thisT();
}

@Override
public T userAgent(String userAgent) {
delegate().userAgent(userAgent);
Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/ManagedChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ public T offloadExecutor(Executor executor) {
*/
public abstract T intercept(ClientInterceptor... interceptors);

/**
* Adds a {@link ClientTransportFilter}. The order of filters being added is the order they will
* be executed
*
* @return this
* @since 1.60.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10652")
public T addTransportFilter(ClientTransportFilter filter) {
throw new UnsupportedOperationException();
}

/**
* Provides a custom {@code User-Agent} for the application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ private void checkSecurityPolicy(IBinder binder) {
// triggers), could have shut us down.
if (!isShutdown()) {
setState(TransportState.READY);
attributes = clientTransportListener.filterTransport(attributes);
clientTransportListener.transportReady();
}
}
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientTransportFilter;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
Expand Down Expand Up @@ -77,6 +78,8 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger;

private final List<ClientTransportFilter> transportFilters;

/**
* All field must be mutated in the syncContext.
*/
Expand Down Expand Up @@ -159,7 +162,8 @@ protected void handleNotInUse() {
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
InternalLogId logId, ChannelLogger channelLogger) {
InternalLogId logId, ChannelLogger channelLogger,
List<ClientTransportFilter> transportFilters) {
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
Expand All @@ -180,6 +184,7 @@ protected void handleNotInUse() {
this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
this.logId = Preconditions.checkNotNull(logId, "logId");
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
}

ChannelLogger getChannelLogger() {
Expand Down Expand Up @@ -539,6 +544,15 @@ private class TransportListener implements ManagedClientTransport.Listener {
this.transport = transport;
}

@Override
public Attributes filterTransport(Attributes attributes) {
for (ClientTransportFilter filter : transportFilters) {
attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
"Filter %s returned null", filter);
}
return attributes;
}

@Override
public void transportReady() {
channelLogger.log(ChannelLogLevel.INFO, "READY");
Expand Down Expand Up @@ -607,6 +621,9 @@ public void transportTerminated() {
channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
channelz.removeClientSocket(transport);
handleTransportInUseState(transport, false);
for (ClientTransportFilter filter : transportFilters) {
filter.transportTerminated(transport.getAttributes());
}
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand Down Expand Up @@ -209,6 +210,8 @@ public void uncaughtException(Thread t, Throwable e) {
* {@link RealChannel}.
*/
private final Channel interceptorChannel;

private final List<ClientTransportFilter> transportFilters;
@Nullable private final String userAgent;

// Only null after channel is terminated. Must be assigned from the syncContext.
Expand Down Expand Up @@ -661,6 +664,7 @@ ClientStream newSubstream(
channel = builder.binlog.wrapChannel(channel);
}
this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
this.transportFilters = new ArrayList<>(builder.transportFilters);
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
Expand Down Expand Up @@ -1566,7 +1570,8 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
callTracerFactory.create(),
subchannelTracer,
subchannelLogId,
subchannelLogger);
subchannelLogger,
transportFilters);
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
Expand Down Expand Up @@ -1990,7 +1995,8 @@ void onNotInUse(InternalSubchannel is) {
callTracerFactory.create(),
subchannelTracer,
subchannelLogId,
subchannelLogger);
subchannelLogger,
transportFilters);

channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started")
Expand Down Expand Up @@ -2148,6 +2154,11 @@ public void transportReady() {
// Don't care
}

@Override
public Attributes filterTransport(Attributes attributes) {
return attributes;
}

@Override
public void transportInUse(final boolean inUse) {
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.internal;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -27,6 +28,7 @@
import io.grpc.CallCredentials;
import io.grpc.ChannelCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
Expand Down Expand Up @@ -137,6 +139,8 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
private final List<ClientInterceptor> interceptors = new ArrayList<>();
NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry();

final List<ClientTransportFilter> transportFilters = new ArrayList<>();

final String target;
@Nullable
final ChannelCredentials channelCredentials;
Expand Down Expand Up @@ -267,11 +271,11 @@ public ManagedChannelImplBuilder(
String target, @Nullable ChannelCredentials channelCreds, @Nullable CallCredentials callCreds,
ClientTransportFactoryBuilder clientTransportFactoryBuilder,
@Nullable ChannelBuilderDefaultPortProvider channelBuilderDefaultPortProvider) {
this.target = Preconditions.checkNotNull(target, "target");
this.target = checkNotNull(target, "target");
this.channelCredentials = channelCreds;
this.callCredentials = callCreds;
this.clientTransportFactoryBuilder = Preconditions
.checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
this.clientTransportFactoryBuilder = checkNotNull(clientTransportFactoryBuilder,
"clientTransportFactoryBuilder");
this.directServerAddress = null;

if (channelBuilderDefaultPortProvider != null) {
Expand Down Expand Up @@ -323,8 +327,8 @@ public ManagedChannelImplBuilder(SocketAddress directServerAddress, String autho
this.target = makeTargetStringForDirectAddress(directServerAddress);
this.channelCredentials = channelCreds;
this.callCredentials = callCreds;
this.clientTransportFactoryBuilder = Preconditions
.checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
this.clientTransportFactoryBuilder = checkNotNull(clientTransportFactoryBuilder,
"clientTransportFactoryBuilder");
this.directServerAddress = directServerAddress;
NameResolverRegistry reg = new NameResolverRegistry();
reg.register(new DirectAddressNameResolverProvider(directServerAddress,
Expand Down Expand Up @@ -374,6 +378,12 @@ public ManagedChannelImplBuilder intercept(ClientInterceptor... interceptors) {
return intercept(Arrays.asList(interceptors));
}

@Override
public ManagedChannelImplBuilder addTransportFilter(ClientTransportFilter hook) {
transportFilters.add(checkNotNull(hook, "transport filter"));
return this;
}

@Deprecated
@Override
public ManagedChannelImplBuilder nameResolverFactory(NameResolver.Factory resolverFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.internal;

import io.grpc.Attributes;
import io.grpc.Status;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -104,5 +105,11 @@ interface Listener {
* at least one stream.
*/
void transportInUse(boolean inUse);

/**
* Called just before {@link #transportReady} to allow direct modification of transport
* Attributes.
*/
Attributes filterTransport(Attributes attributes);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/OobChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public void transportReady() {
// Don't care
}

@Override
public Attributes filterTransport(Attributes attributes) {
return attributes;
}

@Override
public void transportInUse(boolean inUse) {
// Don't care
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -1360,7 +1361,8 @@ private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
channelz, CallTracer.getDefaultFactory().create(),
subchannelTracer,
logId,
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()));
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList());
}

private void assertNoCallbackInvoke() {
Expand Down
44 changes: 44 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ClientTransportFilter;
import io.grpc.CompositeChannelCredentials;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand Down Expand Up @@ -139,6 +140,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -4240,6 +4242,48 @@ public void createResolvingOobChannel() throws Exception {
}
}

@Test
public void transportFilters() {

final AtomicInteger readyCallbackCalled = new AtomicInteger(0);
final AtomicInteger terminationCallbackCalled = new AtomicInteger(0);
ClientTransportFilter transportFilter = new ClientTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
readyCallbackCalled.incrementAndGet();
return transportAttrs;
}

@Override
public void transportTerminated(Attributes transportAttrs) {
terminationCallbackCalled.incrementAndGet();
}
};

channelBuilder.addTransportFilter(transportFilter);
assertEquals(0, readyCallbackCalled.get());

createChannel();
final Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ManagedClientTransport.Listener transportListener = transportInfo.listener;

transportListener.filterTransport(Attributes.EMPTY);
transportListener.transportReady();
assertEquals(1, readyCallbackCalled.get());
assertEquals(0, terminationCallbackCalled.get());

transportListener.transportShutdown(Status.OK);

transportListener.transportTerminated();
assertEquals(1, terminationCallbackCalled.get());
}

private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
Expand Down
Loading