Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try {
return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -167,10 +165,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
}
executor.shutdown();
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
Expand Down Expand Up @@ -206,14 +201,12 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));

BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
Expand Down Expand Up @@ -288,8 +281,7 @@ private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion can be disabled. So this change may skip logging some error messages, its safer to not change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion is checking for checked exceptions, which should't happen. Catching Throwable was because this was written before we could write RuntimeException | Error. I'm not worried about this change.

assert e instanceof RuntimeException || e instanceof Error;
} catch (RuntimeException | Error e) {
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
Expand Down Expand Up @@ -320,9 +312,7 @@ private static <ReqT, RespT> void asyncUnaryRequestCall(
try {
call.sendMessage(req);
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -597,20 +587,12 @@ private static final class BlockingResponseStream<T> implements Iterator<T> {
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
this.threadless = threadless;
}

StartableListener<T> listener() {
Expand All @@ -620,31 +602,14 @@ StartableListener<T> listener() {
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
return next;
}
} finally {
if (interrupt) {
Expand Down