Skip to content

Commit cdaea7a

Browse files
committed
fixed impl with changes made on Subchannel (SubchannelStateListener now ties with Subchannel)
1 parent f4fba45 commit cdaea7a

File tree

2 files changed

+132
-266
lines changed

2 files changed

+132
-266
lines changed

xds/src/main/java/io/grpc/xds/OrcaOobUtil.java

Lines changed: 40 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.grpc.xds;
1818

19-
import static com.google.common.base.Preconditions.checkArgument;
2019
import static com.google.common.base.Preconditions.checkNotNull;
2120
import static com.google.common.base.Preconditions.checkState;
2221
import static io.grpc.ConnectivityState.IDLE;
@@ -52,6 +51,7 @@
5251
import io.grpc.internal.ExponentialBackoffPolicy;
5352
import io.grpc.internal.GrpcUtil;
5453
import io.grpc.util.ForwardingLoadBalancerHelper;
54+
import io.grpc.util.ForwardingSubchannel;
5555
import java.util.ArrayList;
5656
import java.util.HashMap;
5757
import java.util.HashSet;
@@ -147,10 +147,9 @@ public interface OrcaOobReportListener {
147147
* {@link LoadBalancer.Helper#createSubchannel(CreateSubchannelArgs)} for
148148
* {@link OrcaReportingHelperWrapper#asHelper()}, in which the listener is registered.
149149
*
150-
* @param subchannel the subchannel over which the connection to a backend is established.
151150
* @param report load report in the format of ORCA protocol.
152151
*/
153-
void onLoadReport(Subchannel subchannel, OrcaLoadReport report);
152+
void onLoadReport(OrcaLoadReport report);
154153
}
155154

156155
/**
@@ -223,24 +222,17 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
223222
if (orcaState == null) {
224223
// Only the first load balancing policy requesting ORCA reports instantiates an
225224
// OrcaReportingState.
226-
orcaState =
227-
new OrcaReportingState(
228-
this,
229-
args.getStateListener(),
230-
syncContext,
231-
delegate().getScheduledExecutorService());
225+
orcaState = new OrcaReportingState(this, syncContext,
226+
delegate().getScheduledExecutorService());
232227
args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build();
233228
augmented = true;
234229
}
235230
orcaStates.add(orcaState);
236-
Subchannel subchannel;
231+
orcaState.listeners.add(this);
232+
Subchannel subchannel = super.createSubchannel(args);
237233
if (augmented) {
238-
subchannel = super.createSubchannel(args.toBuilder().setStateListener(orcaState).build());
239-
orcaState.init(subchannel);
240-
} else {
241-
subchannel = super.createSubchannel(args);
234+
subchannel = new SubchannelImpl(subchannel, orcaState);
242235
}
243-
orcaState.listeners.add(new SubchannelBoundListener(subchannel, this));
244236
if (orcaConfig != null) {
245237
orcaState.setReportingConfig(this, orcaConfig);
246238
}
@@ -261,9 +253,9 @@ public void run() {
261253
}
262254

263255
@Override
264-
public void onLoadReport(Subchannel subchannel, OrcaLoadReport report) {
256+
public void onLoadReport(OrcaLoadReport report) {
265257
if (orcaConfig != null) {
266-
listener.onLoadReport(subchannel, report);
258+
listener.onLoadReport(report);
267259
}
268260
}
269261

@@ -275,13 +267,14 @@ public void onLoadReport(Subchannel subchannel, OrcaLoadReport report) {
275267
private final class OrcaReportingState implements SubchannelStateListener {
276268

277269
private final OrcaReportingHelper orcaHelper;
278-
private final SubchannelStateListener stateListener;
279270
private final SynchronizationContext syncContext;
280271
private final ScheduledExecutorService timeService;
281-
private final List<SubchannelBoundListener> listeners = new ArrayList<>();
272+
private final List<OrcaOobReportListener> listeners = new ArrayList<>();
282273
private final Map<OrcaReportingHelper, OrcaReportingConfig> configs = new HashMap<>();
283274
@Nullable private Subchannel subchannel;
284275
@Nullable private ChannelLogger subchannelLogger;
276+
@Nullable
277+
private SubchannelStateListener stateListener;
285278
@Nullable private BackoffPolicy backoffPolicy;
286279
@Nullable private OrcaReportingStream orcaRpc;
287280
@Nullable private ScheduledHandle retryTimer;
@@ -299,19 +292,18 @@ public void run() {
299292

300293
OrcaReportingState(
301294
OrcaReportingHelper orcaHelper,
302-
SubchannelStateListener stateListener,
303295
SynchronizationContext syncContext,
304296
ScheduledExecutorService timeService) {
305297
this.orcaHelper = checkNotNull(orcaHelper, "orcaHelper");
306-
this.stateListener = checkNotNull(stateListener, "stateListener");
307298
this.syncContext = checkNotNull(syncContext, "syncContext");
308299
this.timeService = checkNotNull(timeService, "timeService");
309300
}
310301

311-
void init(Subchannel subchannel) {
302+
void init(Subchannel subchannel, SubchannelStateListener stateListener) {
312303
checkState(this.subchannel == null, "init() already called");
313304
this.subchannel = checkNotNull(subchannel, "subchannel");
314305
this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
306+
this.stateListener = checkNotNull(stateListener, "stateListener");
315307
}
316308

317309
void setReportingConfig(OrcaReportingHelper helper, OrcaReportingConfig config) {
@@ -341,12 +333,7 @@ void setReportingConfig(OrcaReportingHelper helper, OrcaReportingConfig config)
341333
}
342334

343335
@Override
344-
public void onSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
345-
checkArgument(
346-
subchannel == this.subchannel,
347-
"Subchannel mismatch: %s vs %s",
348-
subchannel,
349-
this.subchannel);
336+
public void onSubchannelState(ConnectivityStateInfo newState) {
350337
if (Objects.equal(state.getState(), READY) && !Objects.equal(newState.getState(), READY)) {
351338
// A connection was lost. We will reset disabled flag because ORCA service
352339
// may be available on the new connection.
@@ -358,7 +345,7 @@ public void onSubchannelState(Subchannel subchannel, ConnectivityStateInfo newSt
358345
state = newState;
359346
adjustOrcaReporting();
360347
// Propagate subchannel state update to downstream listeners.
361-
stateListener.onSubchannelState(subchannel, newState);
348+
stateListener.onSubchannelState(newState);
362349
}
363350

364351
void adjustOrcaReporting() {
@@ -461,7 +448,7 @@ void handleResponse(OrcaLoadReport response) {
461448
callHasResponded = true;
462449
backoffPolicy = null;
463450
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
464-
for (SubchannelBoundListener listener : listeners) {
451+
for (OrcaOobReportListener listener : listeners) {
465452
listener.onLoadReport(response);
466453
}
467454
call.request(1);
@@ -514,6 +501,29 @@ public String toString() {
514501
}
515502
}
516503

504+
@VisibleForTesting
505+
static final class SubchannelImpl extends ForwardingSubchannel {
506+
507+
private final Subchannel delegate;
508+
private final OrcaReportingHelper.OrcaReportingState orcaState;
509+
510+
SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) {
511+
this.delegate = checkNotNull(delegate, "delegate");
512+
this.orcaState = checkNotNull(orcaState, "orcaState");
513+
}
514+
515+
@Override
516+
protected Subchannel delegate() {
517+
return delegate;
518+
}
519+
520+
@Override
521+
public void start(SubchannelStateListener listener) {
522+
orcaState.init(this, listener);
523+
super.start(orcaState);
524+
}
525+
}
526+
517527
/** Configuration for out-of-band ORCA reporting service RPC. */
518528
public static final class OrcaReportingConfig {
519529

@@ -570,24 +580,4 @@ public OrcaReportingConfig build() {
570580
}
571581
}
572582
}
573-
574-
/**
575-
* A {@code SubchannelBoundListener} binds a {@link OrcaOobReportListener} instance to an {@link
576-
* Subchannel} so that the listener always invokes {@link OrcaOobReportListener#onLoadReport} with
577-
* the subchannel it binds to.
578-
*/
579-
private static class SubchannelBoundListener {
580-
581-
private final Subchannel subchannel;
582-
private final OrcaOobReportListener listener;
583-
584-
SubchannelBoundListener(Subchannel subchannel, OrcaOobReportListener listener) {
585-
this.subchannel = checkNotNull(subchannel, "subchannel");
586-
this.listener = checkNotNull(listener, "listener");
587-
}
588-
589-
void onLoadReport(OrcaLoadReport report) {
590-
listener.onLoadReport(subchannel, report);
591-
}
592-
}
593583
}

0 commit comments

Comments
 (0)