5555import io .grpc .util .ForwardingClientStreamTracer ;
5656import io .grpc .util .ForwardingLoadBalancerHelper ;
5757import java .util .ArrayList ;
58- import java .util .Collection ;
59- import java .util .Collections ;
58+ import java .util .HashMap ;
6059import java .util .HashSet ;
6160import java .util .List ;
61+ import java .util .Map ;
6262import java .util .Set ;
6363import java .util .concurrent .ScheduledExecutorService ;
6464import java .util .concurrent .TimeUnit ;
@@ -327,18 +327,17 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
327327 OrcaReportingState orcaState = args .getOption (ORCA_REPORTING_STATE_KEY );
328328 boolean augmented = false ;
329329 if (orcaState == null ) {
330- // Only the root load balanceing policy instantiate an OrcaReportingState instance to
331- // request for ORCA report from the backend that the delegated helper is trying to
332- // create subchannel to.
330+ // Only the first load balancing policy requesting ORCA reports instantiates an
331+ // OrcaReportingState.
333332 orcaState = new OrcaReportingState (
334333 this ,
335334 args .getStateListener (),
336335 syncContext ,
337336 delegate ().getScheduledExecutorService ());
338- orcaStates .add (orcaState );
339337 args = args .toBuilder ().addOption (ORCA_REPORTING_STATE_KEY , orcaState ).build ();
340338 augmented = true ;
341339 }
340+ orcaStates .add (orcaState );
342341 orcaState .listeners .add (this );
343342 Subchannel subchannel ;
344343 if (augmented ) {
@@ -348,16 +347,23 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
348347 subchannel = super .createSubchannel (args );
349348 }
350349 if (orcaConfig != null ) {
351- orcaState .setReportingConfig (orcaConfig );
350+ orcaState .setReportingConfig (this , orcaConfig );
352351 }
353352 return subchannel ;
354353 }
355354
356- void setReportingConfig (OrcaReportingConfig config ) {
357- orcaConfig = config ;
358- for (OrcaReportingState state : orcaStates ) {
359- state .setReportingConfig (config );
360- }
355+ void setReportingConfig (final OrcaReportingConfig config ) {
356+ syncContext .execute (
357+ new Runnable () {
358+ @ Override
359+ public void run () {
360+ orcaConfig = config ;
361+ for (OrcaReportingState state : orcaStates ) {
362+ state .setReportingConfig (OrcaReportingHelper .this , config );
363+ }
364+ }
365+ }
366+ );
361367 }
362368
363369 @ Override
@@ -379,6 +385,7 @@ private final class OrcaReportingState implements SubchannelStateListener {
379385 private final SynchronizationContext syncContext ;
380386 private final ScheduledExecutorService timeService ;
381387 private final List <OrcaOobReportListener > listeners = new ArrayList <>();
388+ private final Map <OrcaReportingHelper , OrcaReportingConfig > configs = new HashMap <>();
382389 @ Nullable
383390 private Subchannel subchannel ;
384391 @ Nullable
@@ -418,25 +425,25 @@ void init(Subchannel subchannel) {
418425 this .subchannelLogger = checkNotNull (subchannel .getChannelLogger (), "subchannelLogger" );
419426 }
420427
421- void setReportingConfig (OrcaReportingConfig config ) {
428+ void setReportingConfig (OrcaReportingHelper helper , OrcaReportingConfig config ) {
422429 boolean reconfigured = false ;
423- // The overall config is the union of existing config and new config requested by some
424- // load balancing policy.
425- if (overallConfig != null ) {
426- if (config .reportIntervalNanos < overallConfig .reportIntervalNanos ) {
427- overallConfig .reportIntervalNanos = config .reportIntervalNanos ;
428- reconfigured = true ;
430+ configs .put (helper , config );
431+ // Real reporting interval is the minimum of intervals requested by all participating
432+ // helpers.
433+ if (overallConfig == null ) {
434+ overallConfig = config ;
435+ reconfigured = true ;
436+ } else {
437+ long minInterval = Long .MAX_VALUE ;
438+ for (OrcaReportingConfig c : configs .values ()) {
439+ if (c .reportIntervalNanos < minInterval ) {
440+ minInterval = c .reportIntervalNanos ;
441+ }
429442 }
430- if (!overallConfig .costNames .isEmpty ()
431- && !overallConfig .costNames .containsAll (config .costNames )) {
432- overallConfig .costNames .addAll (config .costNames );
443+ if (overallConfig .reportIntervalNanos != minInterval ) {
444+ overallConfig .reportIntervalNanos = minInterval ;
433445 reconfigured = true ;
434- } else {
435- overallConfig .costNames .clear ();
436446 }
437- } else {
438- overallConfig = config ;
439- reconfigured = true ;
440447 }
441448 if (reconfigured ) {
442449 stopRpc ("ORCA reporting reconfigured" );
@@ -502,6 +509,7 @@ public String toString() {
502509 return MoreObjects .toStringHelper (this )
503510 .add ("disabled" , disabled )
504511 .add ("orcaRpc" , orcaRpc )
512+ .add ("reportingConfig" , overallConfig )
505513 .add ("connectivityState" , state )
506514 .toString ();
507515 }
@@ -523,7 +531,6 @@ void start() {
523531 call .start (this , new Metadata ());
524532 call .sendMessage (OrcaLoadReportRequest .newBuilder ()
525533 .setReportInterval (Durations .fromNanos (overallConfig .getReportIntervalNanos ()))
526- .addAllRequestCostNames (overallConfig .getCostNames ())
527534 .build ());
528535 call .halfClose ();
529536 call .request (1 );
@@ -636,28 +643,18 @@ public long getReportIntervalNanos() {
636643 return reportIntervalNanos ;
637644 }
638645
639- /**
640- * Returns the set of configured cost metric names to be reported in ORCA report. If this is
641- * empty, all known requests costs tracked by the load reporting agent will be returned.
642- */
643- public Set <String > getCostNames () {
644- return Collections .unmodifiableSet (costNames );
645- }
646-
647646 /**
648647 * Returns a builder with the same initial values as this object.
649648 */
650649 public Builder toBuilder () {
651650 return newBuilder ()
652- .setReportInterval (reportIntervalNanos , TimeUnit .NANOSECONDS )
653- .addCostNames (costNames );
651+ .setReportInterval (reportIntervalNanos , TimeUnit .NANOSECONDS );
654652 }
655653
656654 @ Override
657655 public String toString () {
658656 return MoreObjects .toStringHelper (this )
659657 .add ("reportIntervalNanos" , reportIntervalNanos )
660- .add ("costNames" , costNames )
661658 .toString ();
662659 }
663660
@@ -682,28 +679,6 @@ public Builder setReportInterval(long reportInterval, TimeUnit unit) {
682679 return this ;
683680 }
684681
685- /**
686- * Adds a custom named backend metric to be reported. This provides an opportunity for the
687- * client to selectively obtain a subset of tracked costs.
688- *
689- * @param costName name for custom metric to be reported.
690- */
691- public Builder addCostName (String costName ) {
692- costNames .add (costName );
693- return this ;
694- }
695-
696- /**
697- * Adds a set of custom named backend metric to be reported. This provides an opportunity for
698- * the client to selectively obtain a subset of tracked costs.
699- *
700- * @param costNames collection of names for custom metric to be reported.
701- */
702- public Builder addCostNames (Collection <String > costNames ) {
703- this .costNames .addAll (costNames );
704- return this ;
705- }
706-
707682 /**
708683 * Creates a new {@link OrcaReportingConfig} object.
709684 */
0 commit comments