Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 303 additions & 6 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package io.grpc;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -329,15 +331,22 @@ public boolean equals(Object obj) {
* <p>SHUTDOWN can only happen in two cases. One is that LoadBalancer called {@link
* Subchannel#shutdown} earlier, thus it should have already discarded this Subchannel. The other
* is that Channel is doing a {@link ManagedChannel#shutdownNow forced shutdown} or has already
* terminated, thus there won't be further requests to LoadBalancer. Therefore, SHUTDOWN can be
* safely ignored.
* terminated, thus there won't be further requests to LoadBalancer. Therefore, the LoadBalancer
* usually don't need to react to a SHUTDOWN state.
*
* @param subchannel the involved Subchannel
* @param stateInfo the new state
* @since 1.2.0
* @deprecated This method will be removed. Stop overriding it. Instead, pass {@link
* SubchannelStateListener} to {@link Subchannel#start} to receive Subchannel state
* updates
*/
public abstract void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo);
@Deprecated
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
// Do nothing. If the implemetation doesn't implement this, it will get subchannel states from
// the new API. We don't throw because there may be forwarding LoadBalancers still plumb this.
}

/**
* The channel asks the load-balancer to shutdown. No more callbacks will be called after this
Expand Down Expand Up @@ -539,7 +548,7 @@ private PickResult(
* {@code handleSubchannelState}'s javadoc for more details.</li>
* </ol>
*
* @param subchannel the picked Subchannel
* @param subchannel the picked Subchannel. It must have been {@link Subchannel#start started}
* @param streamTracerFactory if not null, will be used to trace the activities of the stream
* created as a result of this pick. Note it's possible that no
* stream is created at all in some cases.
Expand Down Expand Up @@ -666,6 +675,218 @@ public boolean equals(Object other) {
}
}

/**
* Arguments for creating a {@link Subchannel}.
*
* @since 1.22.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public static final class CreateSubchannelArgs {
private final List<EquivalentAddressGroup> addrs;
private final Attributes attrs;
private final Object[][] customOptions;

private CreateSubchannelArgs(
List<EquivalentAddressGroup> addrs, Attributes attrs, Object[][] customOptions) {
this.addrs = checkNotNull(addrs, "addresses are not set");
this.attrs = checkNotNull(attrs, "attrs");
this.customOptions = checkNotNull(customOptions, "customOptions");
}

/**
* Returns the addresses, which is an unmodifiable list.
*/
public List<EquivalentAddressGroup> getAddresses() {
return addrs;
}

/**
* Returns the attributes.
*/
public Attributes getAttributes() {
return attrs;
}

/**
* Get the value for a custom option or its inherent default.
*
* @param key Key identifying option
*/
@SuppressWarnings("unchecked")
public <T> T getOption(Key<T> key) {
Preconditions.checkNotNull(key, "key");
for (int i = 0; i < customOptions.length; i++) {
if (key.equals(customOptions[i][0])) {
return (T) customOptions[i][1];
}
}
return key.defaultValue;
}

/**
* Returns a builder with the same initial values as this object.
*/
public Builder toBuilder() {
return newBuilder().setAddresses(addrs).setAttributes(attrs).copyCustomOptions(customOptions);
}

/**
* Creates a new builder.
*/
public static Builder newBuilder() {
return new Builder();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("addrs", addrs)
.add("attrs", attrs)
.add("customOptions", Arrays.deepToString(customOptions))
.toString();
}

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public static final class Builder {

private List<EquivalentAddressGroup> addrs;
private Attributes attrs = Attributes.EMPTY;
private Object[][] customOptions = new Object[0][2];

Builder() {
}

private <T> Builder copyCustomOptions(Object[][] options) {
customOptions = new Object[options.length][2];
System.arraycopy(options, 0, customOptions, 0, options.length);
return this;
}

/**
* Add a custom option. Any existing value for the key is overwritten.
*
* <p>This is an <strong>optional</strong> property.
*
* @param key the option key
* @param value the option value
*/
public <T> Builder addOption(Key<T> key, T value) {
Preconditions.checkNotNull(key, "key");
Preconditions.checkNotNull(value, "value");

int existingIdx = -1;
for (int i = 0; i < customOptions.length; i++) {
if (key.equals(customOptions[i][0])) {
existingIdx = i;
break;
}
}

if (existingIdx == -1) {
Object[][] newCustomOptions = new Object[customOptions.length + 1][2];
System.arraycopy(customOptions, 0, newCustomOptions, 0, customOptions.length);
customOptions = newCustomOptions;
existingIdx = customOptions.length - 1;
}
customOptions[existingIdx] = new Object[]{key, value};
return this;
}

/**
* The addresses to connect to. All addresses are considered equivalent and will be tried
* in the order they are provided.
*/
public Builder setAddresses(EquivalentAddressGroup addrs) {
this.addrs = Collections.singletonList(addrs);
return this;
}

/**
* The addresses to connect to. All addresses are considered equivalent and will
* be tried in the order they are provided.
*
* <p>This is a <strong>required</strong> property.
*
* @throws IllegalArgumentException if {@code addrs} is empty
*/
public Builder setAddresses(List<EquivalentAddressGroup> addrs) {
checkArgument(!addrs.isEmpty(), "addrs is empty");
this.addrs = Collections.unmodifiableList(new ArrayList<>(addrs));
return this;
}

/**
* Attributes provided here will be included in {@link Subchannel#getAttributes}.
*
* <p>This is an <strong>optional</strong> property. Default is empty if not set.
*/
public Builder setAttributes(Attributes attrs) {
this.attrs = checkNotNull(attrs, "attrs");
return this;
}

/**
* Creates a new args object.
*/
public CreateSubchannelArgs build() {
return new CreateSubchannelArgs(addrs, attrs, customOptions);
}
}

/**
* Key for a key-value pair. Uses reference equality.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public static final class Key<T> {

private final String debugString;
private final T defaultValue;

private Key(String debugString, T defaultValue) {
this.debugString = debugString;
this.defaultValue = defaultValue;
}

/**
* Factory method for creating instances of {@link Key}. The default value of the key is
* {@code null}.
*
* @param debugString a debug string that describes this key.
* @param <T> Key type
* @return Key object
*/
public static <T> Key<T> create(String debugString) {
Preconditions.checkNotNull(debugString, "debugString");
return new Key<>(debugString, /*defaultValue=*/ null);
}

/**
* Factory method for creating instances of {@link Key}.
*
* @param debugString a debug string that describes this key.
* @param defaultValue default value to return when value for key not set
* @param <T> Key type
* @return Key object
*/
public static <T> Key<T> createWithDefault(String debugString, T defaultValue) {
Preconditions.checkNotNull(debugString, "debugString");
return new Key<>(debugString, defaultValue);
}

/**
* Returns the user supplied default value for this key.
*/
public T getDefault() {
return defaultValue;
}

@Override
public String toString() {
return debugString;
}
}
}

/**
* Provides essentials for LoadBalancer implementations.
*
Expand All @@ -679,7 +900,11 @@ public abstract static class Helper {
* EquivalentAddressGroup}.
*
* @since 1.2.0
* @deprecated Use {@link #createSubchannel(io.grpc.LoadBalancer.CreateSubchannelArgs)}
* instead. Note the new API must be called from {@link #getSynchronizationContext
* the Synchronization Context}.
*/
@Deprecated
public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
checkNotNull(addrs, "addrs");
return createSubchannel(Collections.singletonList(addrs), attrs);
Expand All @@ -700,11 +925,32 @@ public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attribute
*
* @throws IllegalArgumentException if {@code addrs} is empty
* @since 1.14.0
* @deprecated Use {@link #createSubchannel(io.grpc.LoadBalancer.CreateSubchannelArgs)}
* instead. Note the new API must be called from {@link #getSynchronizationContext
* the Synchronization Context}.
*/
@Deprecated
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
throw new UnsupportedOperationException();
}

/**
* Creates a Subchannel, which is a logical connection to the given group of addresses which are
* considered equivalent. The {@code attrs} are custom attributes associated with this
* Subchannel, and can be accessed later through {@link Subchannel#getAttributes
* Subchannel.getAttributes()}.
*
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
* Subchannels within {@link #shutdown}.
*
* <p>It must be called from {@link #getSynchronizationContext the Synchronization Context}
*
* @since 1.22.0
*/
public Subchannel createSubchannel(CreateSubchannelArgs args) {
throw new UnsupportedOperationException();
}

/**
* Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with
* the given single {@code EquivalentAddressGroup}.
Expand Down Expand Up @@ -925,14 +1171,35 @@ public NameResolverRegistry getNameResolverRegistry() {
* #requestConnection requestConnection()} can be used to ask Subchannel to create a transport if
* there isn't any.
*
* <p>{@link #start} must be called prior to calling any other methods, with the exception of
* {@link #shutdown}, which can be called at any time.
*
* @since 1.2.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public abstract static class Subchannel {
/**
* Starts the Subchannel. Can only be called once.
*
* <p>Must be called prior to any other method on this class, except for {@link #shutdown} which
* may be called at any time.
*
* <p>Must be called from the {@link Helper#getSynchronizationContext Synchronization Context},
* otherwise it may throw. See <a href="https://github.com/grpc/grpc-java/issues/5015">
* #5015</a> for more discussions.
*
* @param listener receives state updates for this Subchannel.
*/
public void start(SubchannelStateListener listener) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Shuts down the Subchannel. After this method is called, this Subchannel should no longer
* be returned by the latest {@link SubchannelPicker picker}, and can be safely discarded.
*
* <p>Calling it on an already shut-down Subchannel has no effect.
*
* <p>It should be called from the Synchronization Context. Currently will log a warning if
* violated. It will become an exception eventually. See <a
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
Expand Down Expand Up @@ -966,7 +1233,7 @@ public abstract static class Subchannel {
*/
public final EquivalentAddressGroup getAddresses() {
List<EquivalentAddressGroup> groups = getAllAddresses();
Preconditions.checkState(groups.size() == 1, "Does not have exactly one group");
Preconditions.checkState(groups.size() == 1, "%s does not have exactly one group", groups);
return groups.get(0);
}

Expand Down Expand Up @@ -1031,6 +1298,36 @@ public ChannelLogger getChannelLogger() {
}
}

/**
* Receives state changes for one {@link Subchannel}. All methods are run under {@link
* Helper#getSynchronizationContext}.
*
* @since 1.22.0
*/
public interface SubchannelStateListener {
/**
* Handles a state change on a Subchannel.
*
* <p>The initial state of a Subchannel is IDLE. You won't get a notification for the initial
* IDLE state.
*
* <p>If the new state is not SHUTDOWN, this method should create a new picker and call {@link
* Helper#updateBalancingState Helper.updateBalancingState()}. Failing to do so may result in
* unnecessary delays of RPCs. Please refer to {@link PickResult#withSubchannel
* PickResult.withSubchannel()}'s javadoc for more information.
*
* <p>SHUTDOWN can only happen in two cases. One is that LoadBalancer called {@link
* Subchannel#shutdown} earlier, thus it should have already discarded this Subchannel. The
* other is that Channel is doing a {@link ManagedChannel#shutdownNow forced shutdown} or has
* already terminated, thus there won't be further requests to LoadBalancer. Therefore, the
* LoadBalancer usually don't need to react to a SHUTDOWN state.
* @param newState the new state
*
* @since 1.22.0
*/
void onSubchannelState(ConnectivityStateInfo newState);
}

/**
* Factory to create {@link LoadBalancer} instance.
*
Expand Down
Loading