Skip to content

Commit 9f9bf13

Browse files
LantaoJinsrowen
authored andcommitted
[SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed
This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
1 parent b477194 commit 9f9bf13

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,16 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
237237
sendRpc(message, new RpcResponseCallback() {
238238
@Override
239239
public void onSuccess(ByteBuffer response) {
240-
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
241-
copy.put(response);
242-
// flip "copy" to make it readable
243-
copy.flip();
244-
result.set(copy);
240+
try {
241+
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
242+
copy.put(response);
243+
// flip "copy" to make it readable
244+
copy.flip();
245+
result.set(copy);
246+
} catch (Throwable t) {
247+
logger.warn("Error in responding PRC callback", t);
248+
result.setException(t);
249+
}
245250
}
246251

247252
@Override

0 commit comments

Comments
 (0)