diff --git a/stub/src/main/java/io/grpc/stub/BlockingClientCall.java b/stub/src/main/java/io/grpc/stub/BlockingClientCall.java index 58881ef0592..b62bd4322a9 100644 --- a/stub/src/main/java/io/grpc/stub/BlockingClientCall.java +++ b/stub/src/main/java/io/grpc/stub/BlockingClientCall.java @@ -87,7 +87,7 @@ public final class BlockingClientCall { */ public RespT read() throws InterruptedException, StatusException { try { - return read(true, 0, TimeUnit.NANOSECONDS); + return read(true, 0); } catch (TimeoutException e) { throw new AssertionError("should never happen", e); } @@ -106,16 +106,14 @@ public RespT read() throws InterruptedException, StatusException { */ public RespT read(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, StatusException { - return read(false, timeout, unit); + long endNanoTime = System.nanoTime() + unit.toNanos(timeout); + return read(false, endNanoTime); } - private RespT read(boolean waitForever, long timeout, TimeUnit unit) + private RespT read(boolean waitForever, long endNanoTime) throws InterruptedException, TimeoutException, StatusException { - long start = System.nanoTime(); - long end = start + unit.toNanos(timeout); - Predicate> predicate = BlockingClientCall::skipWaitingForRead; - executor.waitAndDrainWithTimeout(waitForever, end, predicate, this); + executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this); RespT bufferedValue = buffer.poll(); if (logger.isLoggable(Level.FINER)) { @@ -182,7 +180,7 @@ public boolean hasNext() throws InterruptedException, StatusException { */ public boolean write(ReqT request) throws InterruptedException, StatusException { try { - return write(true, request, Integer.MAX_VALUE, TimeUnit.DAYS); + return write(true, request, 0); } catch (TimeoutException e) { throw new RuntimeException(e); // should never happen } @@ -211,21 +209,20 @@ public boolean write(ReqT request) throws InterruptedException, StatusException */ public boolean write(ReqT request, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, StatusException { - return write(false, request, timeout, unit); + long endNanoTime = System.nanoTime() + unit.toNanos(timeout); + return write(false, request, endNanoTime); } - private boolean write(boolean waitForever, ReqT request, long timeout, TimeUnit unit) + private boolean write(boolean waitForever, ReqT request, long endNanoTime) throws InterruptedException, TimeoutException, StatusException { if (writeClosed) { throw new IllegalStateException("Writes cannot be done after calling halfClose or cancel"); } - long end = System.nanoTime() + unit.toNanos(timeout); - Predicate> predicate = (x) -> x.call.isReady() || x.closedStatus != null; - executor.waitAndDrainWithTimeout(waitForever, end, predicate, this); + executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this); Status savedClosedStatus = closedStatus; if (savedClosedStatus == null) { call.sendMessage(request);