Skip to content

Commit 14585f1

Browse files
committed
changed orca listener interface to use separate listener interfaces for per-request and out-of-band cases
1 parent 85bd6de commit 14585f1

File tree

3 files changed

+78
-63
lines changed

3 files changed

+78
-63
lines changed

xds/src/main/java/io/grpc/xds/OrcaUtil.java

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -83,33 +83,35 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
8383
};
8484

8585
/**
86-
* Creates a new {@link ClientStreamTracer.Factory} with provided {@link OrcaReportListener}
87-
* installed to receive callback when a per-request ORCA report is received.
86+
* Creates a new {@link ClientStreamTracer.Factory} with provided
87+
* {@link OrcaPerRequestReportListener} installed to receive callback when a per-request
88+
* ORCA report is received.
8889
*
8990
* @param listener contains the callback to be invoked when a per-request ORCA report is
9091
* received.
9192
*/
9293
public static ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
93-
OrcaReportListener listener) {
94+
OrcaPerRequestReportListener listener) {
9495
return newOrcaClientStreamTracerFactory(NOOP_CLIENT_STREAM_TRACER_FACTORY, listener);
9596
}
9697

9798
/**
98-
* Creates a new {@link ClientStreamTracer.Factory} with provided {@link OrcaReportListener}
99-
* installed to receive callback when a per-request ORCA report is received.
99+
* Creates a new {@link ClientStreamTracer.Factory} with provided
100+
* {@link OrcaPerRequestReportListener} installed to receive callback when a per-request
101+
* ORCA report is received.
100102
*
101103
* @param delegate the delegate factory to produce other client stream tracing.
102104
* @param listener contains the callback to be invoked when a per-request ORCA report is
103105
* received.
104106
*/
105107
public static ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
106-
ClientStreamTracer.Factory delegate, OrcaReportListener listener) {
108+
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
107109
return new OrcaReportingTracerFactory(delegate, listener);
108110
}
109111

110112
/**
111-
* Creates a new {@link LoadBalancer.Helper} with provided {@link OrcaReportListener} installed to
112-
* receive callback when an out-of-band ORCA report is received.
113+
* Creates a new {@link LoadBalancer.Helper} with provided {@link OrcaOobReportListener}
114+
* installed to receive callback when an out-of-band ORCA report is received.
113115
*
114116
* <p>Note the original {@code LoadBalancer} must call returned helper's
115117
* {@code Helper.createSubchannel()} from its SynchronizationContext, or it will throw.
@@ -124,7 +126,7 @@ public static ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
124126
*/
125127
public static OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
126128
LoadBalancer.Helper delegate,
127-
OrcaReportListener listener,
129+
OrcaOobReportListener listener,
128130
BackoffPolicy.Provider backoffPolicyProvider,
129131
Supplier<Stopwatch> stopwatchSupplier) {
130132
final OrcaReportingHelper orcaHelper =
@@ -144,23 +146,38 @@ public Helper asHelper() {
144146
}
145147

146148
/**
147-
* The listener interface for receiving backend ORCA reports. The class that is interested in
148-
* processing backend cost metrics implements this interface, and the object created with that
149-
* class is registered with a component, using methods in {@link OrcaUtil}. When an ORCA report is
150-
* received, that object's {@code onLoadReport} method is invoked.
149+
* The listener interface for receiving per-request ORCA reports from backends. The class that
150+
* is interested in processing backend cost metrics implements this interface, and the object
151+
* created with that class is registered with a component, using methods in {@link OrcaUtil}.
152+
* When an ORCA report is received, that object's {@code onLoadReport} method is invoked.
151153
*/
152-
public interface OrcaReportListener {
154+
public interface OrcaPerRequestReportListener {
153155

154156
/**
155-
* Invoked when an ORCA report is received.
157+
* Invoked when an per-request ORCA report is received.
156158
*
157-
* <p>For out-of-band reporting, the actual reporting might be more frequently and the reports
158-
* might contain more entries of named cost metrics than configured due to other load balancing
159-
* polices requesting for more frequent and detailed reports.
159+
* @param report load report in the format of ORCA format.
160160
*/
161161
void onLoadReport(OrcaLoadReport report);
162162
}
163163

164+
/**
165+
* The listener interface for receiving out-of-band ORCA reports from backends. The class that
166+
* is interested in processing backend cost metrics implements this interface, and the object
167+
* created with that class is registered with a component, using methods in {@link OrcaUtil}.
168+
* When an ORCA report is received, that object's {@code onLoadReport} method is invoked.
169+
*/
170+
public interface OrcaOobReportListener {
171+
172+
/**
173+
* Invoked when an out-of-band ORCA report is received.
174+
*
175+
* @param subchannel the subchannel over which the connection to a backend is established.
176+
* @param report load report in the format of ORCA protocol.
177+
*/
178+
void onLoadReport(Subchannel subchannel, OrcaLoadReport report);
179+
}
180+
164181
/**
165182
* The blueprint for {@link LoadBalancer.Helper} with the capability of allowing {@link
166183
* LoadBalancer}s interested in receiving out-of-band ORCA reports to update the reporting
@@ -204,10 +221,10 @@ static final class OrcaReportingTracerFactory extends ClientStreamTracer.Factory
204221
private static final CallOptions.Key<OrcaReportBroker> ORCA_REPORT_BROKER_KEY =
205222
CallOptions.Key.create("internal-orca-report-broker");
206223
private final ClientStreamTracer.Factory delegate;
207-
private final OrcaReportListener listener;
224+
private final OrcaPerRequestReportListener listener;
208225

209226
OrcaReportingTracerFactory(
210-
ClientStreamTracer.Factory delegate, OrcaReportListener listener) {
227+
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
211228
this.delegate = checkNotNull(delegate, "delegate");
212229
this.listener = checkNotNull(listener, "listener");
213230
}
@@ -250,19 +267,19 @@ public void inboundTrailers(Metadata trailers) {
250267
}
251268

252269
/**
253-
* An {@link OrcaReportBroker} instance holds registered {@link OrcaReportListener}s and invoke
254-
* all of them when an {@link OrcaLoadReport} is received.
270+
* A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all
271+
* of them when an {@link OrcaLoadReport} is received.
255272
*/
256273
private static final class OrcaReportBroker {
257274

258-
private final List<OrcaReportListener> listeners = new ArrayList<>();
275+
private final List<OrcaPerRequestReportListener> listeners = new ArrayList<>();
259276

260-
void addListener(OrcaReportListener listener) {
277+
void addListener(OrcaPerRequestReportListener listener) {
261278
listeners.add(listener);
262279
}
263280

264281
void onReport(OrcaLoadReport report) {
265-
for (OrcaReportListener listener : listeners) {
282+
for (OrcaPerRequestReportListener listener : listeners) {
266283
listener.onLoadReport(report);
267284
}
268285
}
@@ -275,12 +292,12 @@ void onReport(OrcaLoadReport report) {
275292
*/
276293
private static final class OrcaReportingHelper
277294
extends ForwardingLoadBalancerHelper
278-
implements OrcaReportListener {
295+
implements OrcaOobReportListener {
279296

280297
private static final CreateSubchannelArgs.Key<OrcaReportingState> ORCA_REPORTING_STATE_KEY =
281298
CreateSubchannelArgs.Key.create("internal-orca-reporting-state");
282299
private final LoadBalancer.Helper delegate;
283-
private final OrcaReportListener listener;
300+
private final OrcaOobReportListener listener;
284301
private final SynchronizationContext syncContext;
285302
private final BackoffPolicy.Provider backoffPolicyProvider;
286303
private final Supplier<Stopwatch> stopwatchSupplier;
@@ -289,7 +306,7 @@ private static final class OrcaReportingHelper
289306
private OrcaReportingConfig orcaConfig;
290307

291308
OrcaReportingHelper(LoadBalancer.Helper delegate,
292-
OrcaReportListener listener,
309+
OrcaOobReportListener listener,
293310
BackoffPolicy.Provider backoffPolicyProvider,
294311
Supplier<Stopwatch> stopwatchSupplier) {
295312
this.delegate = checkNotNull(delegate, "delegate");
@@ -315,15 +332,14 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
315332
// create subchannel to.
316333
orcaState = new OrcaReportingState(
317334
this,
318-
new OrcaReportBroker(),
319335
args.getStateListener(),
320336
syncContext,
321337
delegate().getScheduledExecutorService());
322338
orcaStates.add(orcaState);
323339
args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build();
324340
augmented = true;
325341
}
326-
orcaState.broker.addListener(this);
342+
orcaState.listeners.add(this);
327343
Subchannel subchannel;
328344
if (augmented) {
329345
subchannel = super.createSubchannel(args.toBuilder().setStateListener(orcaState).build());
@@ -345,9 +361,9 @@ void setReportingConfig(OrcaReportingConfig config) {
345361
}
346362

347363
@Override
348-
public void onLoadReport(OrcaLoadReport report) {
364+
public void onLoadReport(Subchannel subchannel, OrcaLoadReport report) {
349365
if (orcaConfig != null) {
350-
listener.onLoadReport(report);
366+
listener.onLoadReport(subchannel, report);
351367
}
352368
}
353369

@@ -362,7 +378,7 @@ private final class OrcaReportingState implements SubchannelStateListener {
362378
private final SubchannelStateListener stateListener;
363379
private final SynchronizationContext syncContext;
364380
private final ScheduledExecutorService timeService;
365-
private final OrcaReportBroker broker;
381+
private final List<OrcaOobReportListener> listeners = new ArrayList<>();
366382
@Nullable
367383
private Subchannel subchannel;
368384
@Nullable
@@ -387,12 +403,10 @@ public void run() {
387403

388404
OrcaReportingState(
389405
OrcaReportingHelper orcaHelper,
390-
OrcaReportBroker broker,
391406
SubchannelStateListener stateListener,
392407
SynchronizationContext syncContext,
393408
ScheduledExecutorService timeService) {
394409
this.orcaHelper = checkNotNull(orcaHelper, "orcaHelper");
395-
this.broker = checkNotNull(broker, "broker");
396410
this.stateListener = checkNotNull(stateListener, "stateListener");
397411
this.syncContext = checkNotNull(syncContext, "syncContext");
398412
this.timeService = checkNotNull(timeService, "timeService");
@@ -544,7 +558,9 @@ void handleResponse(OrcaLoadReport response) {
544558
callHasResponded = true;
545559
backoffPolicy = null;
546560
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
547-
broker.onReport(response);
561+
for (OrcaOobReportListener listener : listeners) {
562+
listener.onLoadReport(subchannel, response);
563+
}
548564
call.request(1);
549565
}
550566

xds/src/test/java/io/grpc/xds/OrcaUtilOobReportingTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import io.grpc.internal.FakeClock;
6565
import io.grpc.stub.StreamObserver;
6666
import io.grpc.testing.GrpcCleanupRule;
67-
import io.grpc.xds.OrcaUtil.OrcaReportListener;
67+
import io.grpc.xds.OrcaUtil.OrcaOobReportListener;
6868
import io.grpc.xds.OrcaUtil.OrcaReportingConfig;
6969
import io.grpc.xds.OrcaUtil.OrcaReportingHelperWrapper;
7070
import java.net.SocketAddress;
@@ -123,7 +123,7 @@ public void uncaughtException(Thread t, Throwable e) {
123123
private final FakeClock fakeClock = new FakeClock();
124124
private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper()));
125125
@Mock
126-
private OrcaReportListener mockOrcaListener;
126+
private OrcaOobReportListener mockOrcaListener;
127127
@Mock private BackoffPolicy.Provider backoffPolicyProvider;
128128
@Mock private BackoffPolicy backoffPolicy1;
129129
@Mock private BackoffPolicy backoffPolicy2;
@@ -302,7 +302,7 @@ public void typicalWorkflow() {
302302
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
303303
serverCall.responseObserver.onNext(report);
304304
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
305-
verify(mockOrcaListener, times(i + 1)).onLoadReport(eq(report));
305+
verify(mockOrcaListener).onLoadReport(same(subchannel), eq(report));
306306
}
307307

308308
for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@@ -367,7 +367,7 @@ public void orcReportingDisabledWhenServiceNotImplemented() {
367367
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
368368
serverCall.responseObserver.onNext(report);
369369
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
370-
verify(mockOrcaListener).onLoadReport(eq(report));
370+
verify(mockOrcaListener).onLoadReport(same(subchannel), eq(report));
371371

372372
verifyZeroInteractions(backoffPolicyProvider);
373373
}
@@ -416,7 +416,7 @@ public void orcaReportingStreamClosedAndRetried() {
416416
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
417417
orcaServiceImp.calls.peek().responseObserver.onNext(report);
418418
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
419-
inOrder.verify(mockOrcaListener).onLoadReport(eq(report));
419+
inOrder.verify(mockOrcaListener).onLoadReport(same(subchannel), eq(report));
420420

421421
// Server closes the ORCA reporting RPC after a response, will restart immediately.
422422
orcaServiceImp.calls.poll().responseObserver.onCompleted();
@@ -447,10 +447,10 @@ public void orcaReportingStreamClosedAndRetried() {
447447
*/
448448
@Test
449449
public void twoLevelPoliciesReceiveSameReport() {
450-
OrcaReportListener parentListener = mockOrcaListener;
450+
OrcaOobReportListener parentListener = mockOrcaListener;
451451
OrcaReportingHelperWrapper parentHelperWrapper = orcaHelperWrapper;
452452
parentHelperWrapper.setReportingConfig(ORCA_REPORTING_CONFIG);
453-
OrcaReportListener childListener = mock(OrcaReportListener.class);
453+
OrcaOobReportListener childListener = mock(OrcaOobReportListener.class);
454454
OrcaReportingHelperWrapper childHelperWrapper =
455455
OrcaUtil.newOrcaReportingHelperWrapper(
456456
parentHelperWrapper.asHelper(),
@@ -475,8 +475,8 @@ public void twoLevelPoliciesReceiveSameReport() {
475475
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
476476
ArgumentCaptor<OrcaLoadReport> parentReportCaptor = ArgumentCaptor.forClass(null);
477477
ArgumentCaptor<OrcaLoadReport> childReportCaptor = ArgumentCaptor.forClass(null);
478-
verify(parentListener).onLoadReport(parentReportCaptor.capture());
479-
verify(childListener).onLoadReport(childReportCaptor.capture());
478+
verify(parentListener).onLoadReport(same(subchannel), parentReportCaptor.capture());
479+
verify(childListener).onLoadReport(same(subchannel), childReportCaptor.capture());
480480
assertThat(parentReportCaptor.getValue()).isEqualTo(report);
481481
assertThat(childReportCaptor.getValue()).isSameInstanceAs(parentReportCaptor.getValue());
482482
}
@@ -491,7 +491,7 @@ public void twoLevelPoliciesReceiveSameReport() {
491491
public void reportMostEntriesAndMostFrequentIntervalRequested() {
492492
OrcaReportingHelperWrapper parentHelperWrapper = orcaHelperWrapper;
493493
parentHelperWrapper.setReportingConfig(ORCA_REPORTING_CONFIG);
494-
OrcaReportListener childListener = mock(OrcaReportListener.class);
494+
OrcaOobReportListener childListener = mock(OrcaOobReportListener.class);
495495
OrcaReportingConfig config =
496496
OrcaReportingConfig.newBuilder()
497497
.setReportInterval(12, TimeUnit.NANOSECONDS)

xds/src/test/java/io/grpc/xds/OrcaUtilPerRequestReportingTest.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.envoyproxy.udpa.data.orca.v1.OrcaLoadReport;
2929
import io.grpc.ClientStreamTracer;
3030
import io.grpc.Metadata;
31-
import io.grpc.xds.OrcaUtil.OrcaReportListener;
31+
import io.grpc.xds.OrcaUtil.OrcaPerRequestReportListener;
3232
import io.grpc.xds.OrcaUtil.OrcaReportingTracerFactory;
3333
import org.junit.Test;
3434
import org.junit.runner.RunWith;
@@ -50,7 +50,7 @@ public class OrcaUtilPerRequestReportingTest {
5050
*/
5151
@Test
5252
public void singlePolicyPerRequestListener() {
53-
OrcaReportListener mockListener = mock(OrcaReportListener.class);
53+
OrcaPerRequestReportListener mockListener = mock(OrcaPerRequestReportListener.class);
5454
// Use a mocked noop stream tracer factory as the original stream tracer factory.
5555
ClientStreamTracer.Factory fakeDelegateFactory = mock(ClientStreamTracer.Factory.class);
5656
ClientStreamTracer fakeTracer = mock(ClientStreamTracer.class);
@@ -92,26 +92,25 @@ public void singlePolicyPerRequestListener() {
9292
*/
9393
@Test
9494
public void twoLevelPoliciesPerRequestListeners() {
95-
OrcaReportListener childListener = mock(OrcaReportListener.class);
96-
ClientStreamTracer.Factory childFactory =
95+
OrcaPerRequestReportListener parentListener = mock(OrcaPerRequestReportListener.class);
96+
ClientStreamTracer.Factory parentFactory =
9797
mock(ClientStreamTracer.Factory.class,
98-
delegatesTo(OrcaUtil.newOrcaClientStreamTracerFactory(childListener)));
98+
delegatesTo(OrcaUtil.newOrcaClientStreamTracerFactory(parentListener)));
9999

100-
OrcaReportListener parentListener = mock(OrcaReportListener.class);
101-
ClientStreamTracer.Factory parentFactory =
102-
OrcaUtil.newOrcaClientStreamTracerFactory(childFactory, parentListener);
103-
// Parent factory will augment the StreamInfo with a broker added and pass it to the child
104-
// factory.
105-
ClientStreamTracer parentTracer =
106-
parentFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
100+
OrcaPerRequestReportListener childListener = mock(OrcaPerRequestReportListener.class);
101+
ClientStreamTracer.Factory childFactory =
102+
OrcaUtil.newOrcaClientStreamTracerFactory(parentFactory, childListener);
103+
// Child factory will augment the StreamInfo and pass it to the parent factory.
104+
ClientStreamTracer childTracer =
105+
childFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
107106
ArgumentCaptor<ClientStreamTracer.StreamInfo> streamInfoCaptor = ArgumentCaptor.forClass(null);
108-
verify(childFactory).newClientStreamTracer(streamInfoCaptor.capture(), any(Metadata.class));
109-
ClientStreamTracer.StreamInfo childStreamInfo = streamInfoCaptor.getValue();
110-
assertThat(childStreamInfo).isNotEqualTo(STREAM_INFO);
107+
verify(parentFactory).newClientStreamTracer(streamInfoCaptor.capture(), any(Metadata.class));
108+
ClientStreamTracer.StreamInfo parentStreamInfo = streamInfoCaptor.getValue();
109+
assertThat(parentStreamInfo).isNotEqualTo(STREAM_INFO);
111110

112111
// When the trailer does not contain ORCA report, no listener callback will be invoked.
113112
Metadata trailer = new Metadata();
114-
parentTracer.inboundTrailers(trailer);
113+
childTracer.inboundTrailers(trailer);
115114
verify(parentListener, never()).onLoadReport(any(OrcaLoadReport.class));
116115
verify(childListener, never()).onLoadReport(any(OrcaLoadReport.class));
117116

@@ -121,7 +120,7 @@ public void twoLevelPoliciesPerRequestListeners() {
121120
trailer.put(
122121
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
123122
OrcaLoadReport.getDefaultInstance());
124-
parentTracer.inboundTrailers(trailer);
123+
childTracer.inboundTrailers(trailer);
125124
ArgumentCaptor<OrcaLoadReport> parentReportCap = ArgumentCaptor.forClass(null);
126125
ArgumentCaptor<OrcaLoadReport> childReportCap = ArgumentCaptor.forClass(null);
127126
verify(parentListener).onLoadReport(parentReportCap.capture());

0 commit comments

Comments
 (0)