Skip to content

Commit a40996c

Browse files
committed
Add ClientTransportFilter
1 parent 15fc70b commit a40996c

File tree

7 files changed

+146
-16
lines changed

7 files changed

+146
-16
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.grpc;
2+
3+
/**
4+
* Listens on the client transport life-cycle events. These filters do not have the capability
5+
* to modify the channels or transport life-cycle event behavior, but they can be useful hooks
6+
* for transport observability. Multiple filters may be registered to the client.
7+
*/
8+
@ExperimentalApi("https://gitub.com/grpc/grpc-java/issues/TODO")
9+
public abstract class ClientTransportFilter {
10+
/**
11+
* Called when a transport is ready to accept traffic (when a connection has been established).
12+
* The default implementation is a no-op.
13+
*/
14+
public void transportReady() {
15+
16+
}
17+
18+
/**
19+
* Called when a transport is shutting down. Shutdown could have been caused by an error or normal
20+
* operation.
21+
* This is called prior to {@link #transportTerminated}.
22+
* Default implementation is a no-op.
23+
*
24+
* @param s the reason for the shutdown.
25+
*/
26+
public void transportShutdown(Status s) {
27+
28+
}
29+
30+
/**
31+
* Called when a transport completed shutting down. All resources have been released.
32+
* All streams have either been closed or transferred off this transport.
33+
* Default implementation is a no-op
34+
*/
35+
public void transportTerminated() {
36+
37+
}
38+
39+
}

api/src/main/java/io/grpc/ManagedChannelBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,18 @@ public T offloadExecutor(Executor executor) {
159159
*/
160160
public abstract T intercept(ClientInterceptor... interceptors);
161161

162+
/**
163+
* Adds a {@link ClientTransportFilter}. The order of filters being added is the order they will
164+
* be executed
165+
*
166+
* @return this
167+
* @since 1.60.0
168+
*/
169+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/TODO")
170+
public T addTransportFilter(ClientTransportFilter filter){
171+
throw new UnsupportedOperationException();
172+
}
173+
162174
/**
163175
* Provides a custom {@code User-Agent} for the application.
164176
*

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.grpc.ChannelLogger;
3636
import io.grpc.ChannelLogger.ChannelLogLevel;
3737
import io.grpc.ClientStreamTracer;
38+
import io.grpc.ClientTransportFilter;
3839
import io.grpc.ConnectivityState;
3940
import io.grpc.ConnectivityStateInfo;
4041
import io.grpc.EquivalentAddressGroup;
@@ -77,6 +78,8 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
7778
private final ChannelTracer channelTracer;
7879
private final ChannelLogger channelLogger;
7980

81+
private final List<ClientTransportFilter> transportFilters;
82+
8083
/**
8184
* All field must be mutated in the syncContext.
8285
*/
@@ -159,7 +162,7 @@ protected void handleNotInUse() {
159162
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
160163
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
161164
InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
162-
InternalLogId logId, ChannelLogger channelLogger) {
165+
InternalLogId logId, ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
163166
Preconditions.checkNotNull(addressGroups, "addressGroups");
164167
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
165168
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
@@ -180,6 +183,7 @@ protected void handleNotInUse() {
180183
this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
181184
this.logId = Preconditions.checkNotNull(logId, "logId");
182185
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
186+
this.transportFilters = transportFilters;
183187
}
184188

185189
ChannelLogger getChannelLogger() {
@@ -542,6 +546,9 @@ private class TransportListener implements ManagedClientTransport.Listener {
542546
@Override
543547
public void transportReady() {
544548
channelLogger.log(ChannelLogLevel.INFO, "READY");
549+
for (ClientTransportFilter filter : transportFilters) {
550+
filter.transportReady();
551+
}
545552
syncContext.execute(new Runnable() {
546553
@Override
547554
public void run() {
@@ -570,6 +577,9 @@ public void transportShutdown(final Status s) {
570577
channelLogger.log(
571578
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
572579
shutdownInitiated = true;
580+
for (ClientTransportFilter filter : transportFilters) {
581+
filter.transportShutdown(s);
582+
}
573583
syncContext.execute(new Runnable() {
574584
@Override
575585
public void run() {
@@ -607,6 +617,9 @@ public void transportTerminated() {
607617
channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
608618
channelz.removeClientSocket(transport);
609619
handleTransportInUseState(transport, false);
620+
for (ClientTransportFilter filter : transportFilters) {
621+
filter.transportTerminated();
622+
}
610623
syncContext.execute(new Runnable() {
611624
@Override
612625
public void run() {

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.grpc.ClientInterceptor;
4343
import io.grpc.ClientInterceptors;
4444
import io.grpc.ClientStreamTracer;
45+
import io.grpc.ClientTransportFilter;
4546
import io.grpc.CompressorRegistry;
4647
import io.grpc.ConnectivityState;
4748
import io.grpc.ConnectivityStateInfo;
@@ -209,6 +210,8 @@ public void uncaughtException(Thread t, Throwable e) {
209210
* {@link RealChannel}.
210211
*/
211212
private final Channel interceptorChannel;
213+
214+
private final List<ClientTransportFilter> transportFilters;
212215
@Nullable private final String userAgent;
213216

214217
// Only null after channel is terminated. Must be assigned from the syncContext.
@@ -582,13 +585,13 @@ ClientStream newSubstream(
582585
private final Rescheduler idleTimer;
583586

584587
ManagedChannelImpl(
585-
ManagedChannelImplBuilder builder,
586-
ClientTransportFactory clientTransportFactory,
587-
BackoffPolicy.Provider backoffPolicyProvider,
588-
ObjectPool<? extends Executor> balancerRpcExecutorPool,
589-
Supplier<Stopwatch> stopwatchSupplier,
590-
List<ClientInterceptor> interceptors,
591-
final TimeProvider timeProvider) {
588+
ManagedChannelImplBuilder builder,
589+
ClientTransportFactory clientTransportFactory,
590+
BackoffPolicy.Provider backoffPolicyProvider,
591+
ObjectPool<? extends Executor> balancerRpcExecutorPool,
592+
Supplier<Stopwatch> stopwatchSupplier,
593+
List<ClientInterceptor> interceptors,
594+
final TimeProvider timeProvider) {
592595
this.target = checkNotNull(builder.target, "target");
593596
this.logId = InternalLogId.allocate("Channel", target);
594597
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
@@ -661,6 +664,7 @@ ClientStream newSubstream(
661664
channel = builder.binlog.wrapChannel(channel);
662665
}
663666
this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
667+
this.transportFilters = builder.transportFilters;
664668
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
665669
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
666670
this.idleTimeoutMillis = builder.idleTimeoutMillis;
@@ -1566,7 +1570,8 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
15661570
callTracerFactory.create(),
15671571
subchannelTracer,
15681572
subchannelLogId,
1569-
subchannelLogger);
1573+
subchannelLogger,
1574+
transportFilters);
15701575
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
15711576
.setDescription("Child Subchannel created")
15721577
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
@@ -1990,7 +1995,8 @@ void onNotInUse(InternalSubchannel is) {
19901995
callTracerFactory.create(),
19911996
subchannelTracer,
19921997
subchannelLogId,
1993-
subchannelLogger);
1998+
subchannelLogger,
1999+
transportFilters);
19942000

19952001
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
19962002
.setDescription("Child Subchannel started")

core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.internal;
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkNotNull;
2021

2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Preconditions;
@@ -27,6 +28,7 @@
2728
import io.grpc.CallCredentials;
2829
import io.grpc.ChannelCredentials;
2930
import io.grpc.ClientInterceptor;
31+
import io.grpc.ClientTransportFilter;
3032
import io.grpc.CompressorRegistry;
3133
import io.grpc.DecompressorRegistry;
3234
import io.grpc.EquivalentAddressGroup;
@@ -137,6 +139,8 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
137139
private final List<ClientInterceptor> interceptors = new ArrayList<>();
138140
NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry();
139141

142+
final List<ClientTransportFilter> transportFilters = new ArrayList<>();
143+
140144
final String target;
141145
@Nullable
142146
final ChannelCredentials channelCredentials;
@@ -267,11 +271,10 @@ public ManagedChannelImplBuilder(
267271
String target, @Nullable ChannelCredentials channelCreds, @Nullable CallCredentials callCreds,
268272
ClientTransportFactoryBuilder clientTransportFactoryBuilder,
269273
@Nullable ChannelBuilderDefaultPortProvider channelBuilderDefaultPortProvider) {
270-
this.target = Preconditions.checkNotNull(target, "target");
274+
this.target = checkNotNull(target, "target");
271275
this.channelCredentials = channelCreds;
272276
this.callCredentials = callCreds;
273-
this.clientTransportFactoryBuilder = Preconditions
274-
.checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
277+
this.clientTransportFactoryBuilder = checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
275278
this.directServerAddress = null;
276279

277280
if (channelBuilderDefaultPortProvider != null) {
@@ -323,8 +326,7 @@ public ManagedChannelImplBuilder(SocketAddress directServerAddress, String autho
323326
this.target = makeTargetStringForDirectAddress(directServerAddress);
324327
this.channelCredentials = channelCreds;
325328
this.callCredentials = callCreds;
326-
this.clientTransportFactoryBuilder = Preconditions
327-
.checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
329+
this.clientTransportFactoryBuilder = checkNotNull(clientTransportFactoryBuilder, "clientTransportFactoryBuilder");
328330
this.directServerAddress = directServerAddress;
329331
NameResolverRegistry reg = new NameResolverRegistry();
330332
reg.register(new DirectAddressNameResolverProvider(directServerAddress,
@@ -374,6 +376,12 @@ public ManagedChannelImplBuilder intercept(ClientInterceptor... interceptors) {
374376
return intercept(Arrays.asList(interceptors));
375377
}
376378

379+
@Override
380+
public ManagedChannelImplBuilder addTransportFilter(ClientTransportFilter filter) {
381+
transportFilters.add(checkNotNull(filter, "filter"));
382+
return this;
383+
}
384+
377385
@Deprecated
378386
@Override
379387
public ManagedChannelImplBuilder nameResolverFactory(NameResolver.Factory resolverFactory) {

core/src/test/java/io/grpc/internal/InternalSubchannelTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.grpc.internal.TestUtils.MockClientTransportInfo;
5555
import java.net.SocketAddress;
5656
import java.util.Arrays;
57+
import java.util.Collections;
5758
import java.util.LinkedList;
5859
import java.util.List;
5960
import java.util.concurrent.BlockingQueue;
@@ -1360,7 +1361,8 @@ private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
13601361
channelz, CallTracer.getDefaultFactory().create(),
13611362
subchannelTracer,
13621363
logId,
1363-
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()));
1364+
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
1365+
Collections.emptyList());
13641366
}
13651367

13661368
private void assertNoCallbackInvoke() {

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import io.grpc.ClientInterceptors;
7373
import io.grpc.ClientStreamTracer;
7474
import io.grpc.ClientStreamTracer.StreamInfo;
75+
import io.grpc.ClientTransportFilter;
7576
import io.grpc.CompositeChannelCredentials;
7677
import io.grpc.ConnectivityState;
7778
import io.grpc.ConnectivityStateInfo;
@@ -107,6 +108,7 @@
107108
import io.grpc.ProxyDetector;
108109
import io.grpc.SecurityLevel;
109110
import io.grpc.ServerMethodDefinition;
111+
import io.grpc.ServerTransportFilter;
110112
import io.grpc.Status;
111113
import io.grpc.Status.Code;
112114
import io.grpc.StringMarshaller;
@@ -139,6 +141,7 @@
139141
import java.util.concurrent.ScheduledExecutorService;
140142
import java.util.concurrent.TimeUnit;
141143
import java.util.concurrent.atomic.AtomicBoolean;
144+
import java.util.concurrent.atomic.AtomicInteger;
142145
import java.util.concurrent.atomic.AtomicLong;
143146
import java.util.concurrent.atomic.AtomicReference;
144147
import javax.annotation.Nullable;
@@ -4240,6 +4243,53 @@ public void createResolvingOobChannel() throws Exception {
42404243
}
42414244
}
42424245

4246+
@Test
4247+
public void transportFilters() throws Exception{
4248+
4249+
final AtomicInteger readyCallbackCalled = new AtomicInteger(0);
4250+
final AtomicInteger shutdownCallbackCalled = new AtomicInteger(0);
4251+
final AtomicInteger terminationCallbackCalled = new AtomicInteger(0);
4252+
ClientTransportFilter transportFilter = new ClientTransportFilter() {
4253+
@Override
4254+
public void transportReady() {
4255+
readyCallbackCalled.incrementAndGet();
4256+
}
4257+
@Override
4258+
public void transportShutdown(Status s) {
4259+
shutdownCallbackCalled.incrementAndGet();
4260+
}
4261+
@Override
4262+
public void transportTerminated() {
4263+
terminationCallbackCalled.incrementAndGet();
4264+
}
4265+
};
4266+
4267+
channelBuilder.addTransportFilter(transportFilter);
4268+
assertEquals(0, readyCallbackCalled.get());
4269+
4270+
createChannel();
4271+
final Subchannel subchannel =
4272+
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
4273+
requestConnectionSafely(helper, subchannel);
4274+
verify(mockTransportFactory)
4275+
.newClientTransport(
4276+
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
4277+
MockClientTransportInfo transportInfo = transports.poll();
4278+
ManagedClientTransport.Listener transportListener = transportInfo.listener;
4279+
4280+
transportListener.transportReady();
4281+
assertEquals(1, readyCallbackCalled.get());
4282+
assertEquals(0, shutdownCallbackCalled.get());
4283+
assertEquals(0, terminationCallbackCalled.get());
4284+
4285+
transportListener.transportShutdown(Status.OK);
4286+
assertEquals(1, shutdownCallbackCalled.get());
4287+
assertEquals(0, terminationCallbackCalled.get());
4288+
4289+
transportListener.transportTerminated();
4290+
assertEquals(1, terminationCallbackCalled.get());
4291+
}
4292+
42434293
private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
42444294
@Override
42454295
public BackoffPolicy get() {

0 commit comments

Comments
 (0)