Skip to content

stub: Only throw on cancellation for streaming responses #7173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private ServerCalls() {
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
UnaryMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<>(method);
return new UnaryServerCallHandler<>(method, false);
}

/**
Expand All @@ -58,7 +58,7 @@ public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
ServerStreamingMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<>(method);
return new UnaryServerCallHandler<>(method, true);
}

/**
Expand All @@ -68,7 +68,7 @@ public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingC
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
ClientStreamingMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<>(method);
return new StreamingServerCallHandler<>(method, false);
}

/**
Expand All @@ -78,7 +78,7 @@ public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingC
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
BidiStreamingMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<>(method);
return new StreamingServerCallHandler<>(method, true);
}

/**
Expand Down Expand Up @@ -113,10 +113,12 @@ private static final class UnaryServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {

private final UnaryRequestMethod<ReqT, RespT> method;
private final boolean serverStreaming;

// Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) {
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming) {
this.method = method;
this.serverStreaming = serverStreaming;
}

@Override
Expand All @@ -125,7 +127,7 @@ public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadat
call.getMethodDescriptor().getType().clientSendsOneMessage(),
"asyncUnaryRequestCall is only for clientSendsOneMessage methods");
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<>(call);
new ServerCallStreamObserverImpl<>(call, serverStreaming);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
Expand Down Expand Up @@ -189,9 +191,11 @@ public void onHalfClose() {

@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
} else {
// Only trigger exceptions if unable to provide notification via a callback
responseObserver.cancelled = true;
}
}

Expand All @@ -209,16 +213,18 @@ private static final class StreamingServerCallHandler<ReqT, RespT>
implements ServerCallHandler<ReqT, RespT> {

private final StreamingRequestMethod<ReqT, RespT> method;
private final boolean bidi;

// Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) {
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi) {
this.method = method;
this.bidi = bidi;
}

@Override
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<>(call);
new ServerCallStreamObserverImpl<>(call, bidi);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.autoRequestEnabled) {
Expand Down Expand Up @@ -262,14 +268,19 @@ public void onHalfClose() {

@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
} else {
// Only trigger exceptions if unable to provide notification via a callback. Even though
// onError would provide notification to the server, we still throw an error since there
// isn't a guaranteed callback available. If the cancellation happened in a different
// order the service could be surprised to see the exception.
responseObserver.cancelled = true;
}
if (!halfClosed) {
requestObserver.onError(
Status.CANCELLED
.withDescription("cancelled before receiving half close")
.withDescription("client cancelled")
.asRuntimeException());
}
}
Expand Down Expand Up @@ -300,6 +311,7 @@ private interface StreamingRequestMethod<ReqT, RespT> {
private static final class ServerCallStreamObserverImpl<ReqT, RespT>
extends ServerCallStreamObserver<RespT> {
final ServerCall<ReqT, RespT> call;
private final boolean serverStreamingOrBidi;
volatile boolean cancelled;
private boolean frozen;
private boolean autoRequestEnabled = true;
Expand All @@ -310,8 +322,9 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
private boolean completed = false;

// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
this.call = call;
this.serverStreamingOrBidi = serverStreamingOrBidi;
}

private void freeze() {
Expand All @@ -331,10 +344,17 @@ public void setCompression(String compression) {
@Override
public void onNext(RespT response) {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
if (serverStreamingOrBidi) {
throw Status.CANCELLED
.withDescription("call already cancelled. "
+ "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception")
.asRuntimeException();
} else {
// We choose not to throw for unary responses. The exception is intended to stop servers
// from continuing processing, but for unary responses there is no further processing
// so throwing an exception would not provide a benefit and would increase application
// complexity.
}
return;
}
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
Expand All @@ -357,14 +377,8 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
} else {
call.close(Status.OK, new Metadata());
completed = true;
}
call.close(Status.OK, new Metadata());
completed = true;
}

@Override
Expand Down
8 changes: 2 additions & 6 deletions stub/src/test/java/io/grpc/stub/ServerCallsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,8 @@ public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver)
} catch (StatusRuntimeException expected) {
// Expected
}
try {
callObserver.get().onCompleted();
fail("Expected cancellation exception when onCallHandler not set");
} catch (StatusRuntimeException expected) {
// Expected
}
// No exception
callObserver.get().onCompleted();
}

@Test
Expand Down