Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 216 additions & 3 deletions core/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -332,9 +333,17 @@ public boolean equals(Object obj) {
* @param subchannel the involved Subchannel
* @param stateInfo the new state
* @since 1.2.0
* @deprecated This method will be removed. Stop overriding it. Instead, pass {@link
* SubchannelStateListener} to {@link Helper#createSubchannel(List, Attributes,
* SubchannelStateListener)} or {@link Helper#createSubchannel(EquivalentAddressGroup,
* Attributes, SubchannelStateListener)} to receive Subchannel state updates
*/
public abstract void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo);
@Deprecated
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
// Do nothing. If the implemetation doesn't implement this, it will get subchannel states from
// the new API. We don't throw because there may be forwarding LoadBalancers still plumb this.
}

/**
* The channel asks the load-balancer to shutdown. No more callbacks will be called after this
Expand Down Expand Up @@ -648,6 +657,149 @@ public boolean equals(Object other) {
}
}

/**
* Arguments for {@link Helper#createSubchannel(CreateSubchannelArgs)}.
*
* @since 1.21.0
*/
public static final class CreateSubchannelArgs {
private final List<EquivalentAddressGroup> addrs;
private final Attributes attrs;
private final SubchannelStateListener stateListener;

private CreateSubchannelArgs(
List<EquivalentAddressGroup> addrs, Attributes attrs,
SubchannelStateListener stateListener) {
this.addrs = checkNotNull(addrs, "addresses are not set");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made Builder copy it.

this.attrs = checkNotNull(attrs, "attrs");
this.stateListener = checkNotNull(stateListener, "SubchannelStateListener is not set");
}

/**
* Returns the addresses, which is an unmodifiable list.
*/
public List<EquivalentAddressGroup> getAddresses() {
return addrs;
}

/**
* Returns the attributes.
*/
public Attributes getAttributes() {
return attrs;
}

/**
* Returns the state listener.
*/
public SubchannelStateListener getStateListener() {
return stateListener;
}

/**
* Returns a builder with the same initial values as this object.
*/
public Builder toBuilder() {
return newBuilder().setAddresses(addrs).setAttributes(attrs).setStateListener(stateListener);
}

/**
* Creates a new builder.
*/
public static Builder newBuilder() {
return new Builder();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("addrs", addrs)
.add("attrs", attrs)
.add("listener", stateListener)
.toString();
}

@Override
public int hashCode() {
return Objects.hashCode(addrs, attrs, stateListener);
}

/**
* Returns true if the {@link Subchannel}, {@link Status}, and
* {@link ClientStreamTracer.Factory} all match.
*/
@Override
public boolean equals(Object other) {
if (!(other instanceof CreateSubchannelArgs)) {
return false;
}
CreateSubchannelArgs that = (CreateSubchannelArgs) other;
return Objects.equal(addrs, that.addrs) && Objects.equal(attrs, that.attrs)
&& Objects.equal(stateListener, that.stateListener);
}

public static final class Builder {
private List<EquivalentAddressGroup> addrs;
private Attributes attrs = Attributes.EMPTY;
private SubchannelStateListener stateListener;

Builder() {
}

/**
* The addresses to connect to. All addresses are considered equivalent and will be tried
* in the order they are provided.
*/
public Builder setAddresses(EquivalentAddressGroup addrs) {
this.addrs = Collections.singletonList(addrs);
return this;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make Builders ctor package private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* The addresses to connect to. All addresses are considered equivalent and will
* be tried in the order they are provided.
*
* <p>This is a <strong>required</strong> property.
*
* @throws IllegalArgumentException if {@code addrs} is empty
*/
public Builder setAddresses(List<EquivalentAddressGroup> addrs) {
checkArgument(!addrs.isEmpty(), "addrs is empty");
this.addrs = Collections.unmodifiableList(new ArrayList<>(addrs));
return this;
}

/**
* Attributes provided here will be included in {@link Subchannel#getAttributes}.
*
* <p>This is an <strong>optional</strong> property. Default is empty if not set.
*/
public Builder setAttributes(Attributes attrs) {
this.attrs = checkNotNull(attrs, "attrs");
return this;
}

/**
* Receives state changes of the created Subchannel. The listener is called from
* the {@link #getSynchronizationContext Synchronization Context}. It's safe to share the
* listener among multiple Subchannels.
*
* <p>This is a <strong>required</strong> property.
*/
public Builder setStateListener(SubchannelStateListener listener) {
this.stateListener = checkNotNull(listener, "listener");
return this;
}

/**
* Creates a new args object.
*/
public CreateSubchannelArgs build() {
return new CreateSubchannelArgs(addrs, attrs, stateListener);
}
}
}

/**
* Provides essentials for LoadBalancer implementations.
*
Expand All @@ -661,7 +813,11 @@ public abstract static class Helper {
* EquivalentAddressGroup}.
*
* @since 1.2.0
* @deprecated Use {@link #createSubchannel(CreateSubchannelArgs)} instead. Note the new API
* must be called from {@link #getSynchronizationContext the Synchronization
* Context}.
*/
@Deprecated
public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
checkNotNull(addrs, "addrs");
return createSubchannel(Collections.singletonList(addrs), attrs);
Expand All @@ -682,11 +838,35 @@ public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attribute
*
* @throws IllegalArgumentException if {@code addrs} is empty
* @since 1.14.0
* @deprecated Use {@link #createSubchannel(CreateSubchannelArgs)} instead. Note the new API
* must be called from {@link #getSynchronizationContext the Synchronization
* Context}.
*/
@Deprecated
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
throw new UnsupportedOperationException();
}

/**
* Creates a Subchannel, which is a logical connection to the given group of addresses which are
* considered equivalent. The {@code attrs} are custom attributes associated with this
* Subchannel, and can be accessed later through {@link Subchannel#getAttributes
* Subchannel.getAttributes()}.
*
* <p>This method <strong>must be called from the {@link #getSynchronizationContext
* Synchronization Context}</strong>, otherwise it may throw. This is to avoid the race between
* the caller and {@link SubchannelStateListener#onSubchannelState}. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for more discussions.
*
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
* Subchannels within {@link #shutdown}.
*
* @since 1.21.0
*/
public Subchannel createSubchannel(CreateSubchannelArgs args) {
throw new UnsupportedOperationException();
}

/**
* Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with
* the given single {@code EquivalentAddressGroup}.
Expand Down Expand Up @@ -903,7 +1083,7 @@ public abstract static class Subchannel {
*/
public final EquivalentAddressGroup getAddresses() {
List<EquivalentAddressGroup> groups = getAllAddresses();
Preconditions.checkState(groups.size() == 1, "Does not have exactly one group");
Preconditions.checkState(groups.size() == 1, "%s does not have exactly one group", groups);
return groups.get(0);
}

Expand Down Expand Up @@ -964,6 +1144,39 @@ public ChannelLogger getChannelLogger() {
}
}

/**
* Receives state changes for one or more {@link Subchannel}s. All methods are run under {@link
* Helper#getSynchronizationContext}.
*
* @since 1.21.0
*/
public interface SubchannelStateListener {

/**
* Handles a state change on a Subchannel.
*
* <p>The initial state of a Subchannel is IDLE. You won't get a notification for the initial
* IDLE state.
*
* <p>If the new state is not SHUTDOWN, this method should create a new picker and call {@link
* Helper#updateBalancingState Helper.updateBalancingState()}. Failing to do so may result in
* unnecessary delays of RPCs. Please refer to {@link PickResult#withSubchannel
* PickResult.withSubchannel()}'s javadoc for more information.
*
* <p>SHUTDOWN can only happen in two cases. One is that LoadBalancer called {@link
* Subchannel#shutdown} earlier, thus it should have already discarded this Subchannel. The
* other is that Channel is doing a {@link ManagedChannel#shutdownNow forced shutdown} or has
* already terminated, thus there won't be further requests to LoadBalancer. Therefore,
* SHUTDOWN can be safely ignored.
*
* @param subchannel the involved Subchannel
* @param newState the new state
*
* @since 1.21.0
*/
void onSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState);
}

/**
* Factory to create {@link LoadBalancer} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {}
@Override
public void handleNameResolutionError(Status error) {}

@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {}

@Override
public void shutdown() {}
}
Expand Down Expand Up @@ -165,6 +162,7 @@ public void handleNameResolutionError(Status error) {
getDelegate().handleNameResolutionError(error);
}

@Deprecated
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
getDelegate().handleSubchannelState(subchannel, stateInfo);
Expand Down
33 changes: 27 additions & 6 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
Expand Down Expand Up @@ -1049,6 +1050,7 @@ private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
}
}

@Deprecated
@Override
public AbstractSubchannel createSubchannel(
List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
Expand All @@ -1060,17 +1062,36 @@ public AbstractSubchannel createSubchannel(
+ " Otherwise, it may race with handleSubchannelState()."
+ " See https://github.com/grpc/grpc-java/issues/5015", e);
}
checkNotNull(addressGroups, "addressGroups");
checkNotNull(attrs, "attrs");
return createSubchannelInternal(
CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroups)
.setAttributes(attrs)
.setStateListener(new LoadBalancer.SubchannelStateListener() {
@Override
public void onSubchannelState(
LoadBalancer.Subchannel subchannel, ConnectivityStateInfo newState) {
lb.handleSubchannelState(subchannel, newState);
}
})
.build());
}

@Override
public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
syncContext.throwIfNotInThisSynchronizationContext();
return createSubchannelInternal(args);
}

private AbstractSubchannel createSubchannelInternal(final CreateSubchannelArgs args) {
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
checkState(!terminated, "Channel is terminated");
final SubchannelImpl subchannel = new SubchannelImpl(attrs);
final SubchannelImpl subchannel = new SubchannelImpl(args.getAttributes());
long subchannelCreationTime = timeProvider.currentTimeNanos();
InternalLogId subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ null);
ChannelTracer subchannelTracer =
new ChannelTracer(
subchannelLogId, maxTraceEvents, subchannelCreationTime,
"Subchannel for " + addressGroups);
"Subchannel for " + args.getAddresses());

final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
// All callbacks are run in syncContext
Expand All @@ -1086,7 +1107,7 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
handleInternalSubchannelState(newState);
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
lb.handleSubchannelState(subchannel, newState);
args.getStateListener().onSubchannelState(subchannel, newState);
}
}

Expand All @@ -1102,7 +1123,7 @@ void onNotInUse(InternalSubchannel is) {
}

final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroups,
args.getAddresses(),
authority(),
userAgent,
backoffPolicyProvider,
Expand Down
Loading