From 468412f3c65d66b4fd17a62e80187ecec77ace8c Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 22 May 2023 23:56:24 +0800 Subject: [PATCH 1/6] [Feature-13511] Submit Spark task directly on Kubernetes --- .../core/network/DataClientManager.java | 5 +- .../netty/ChannelFutureListenerOnWrite.java | 2 +- .../core/receiver/MessageRecvManager.java | 67 +++++++++++-------- .../core/sender/MessageSendManager.java | 6 +- .../computer/core/sender/MessageSender.java | 8 +++ .../core/sender/QueuedMessageSender.java | 10 +++ .../core/compute/MockMessageSender.java | 7 ++ 7 files changed, 69 insertions(+), 36 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java index 34f7043af..1bfe8c90b 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java @@ -118,10 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) { @Override public void exceptionCaught(TransportException cause, ConnectionId connectionId) { - // TODO: implement failover - LOG.error("Channel for connectionId {} occurred exception", + LOG.error("Channel for connectionId {} occurred exception, shutdown all clients", connectionId, cause); - DataClientManager.this.connManager.closeClient(connectionId); + DataClientManager.this.sender.exceptionCaught(cause, connectionId); } } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java index c37155740..9e54a1693 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java @@ -55,7 +55,7 @@ public void onDone(Channel channel, ChannelFuture future) { } } - public void onSuccess(Channel channel, ChannelFuture future) { + public void onSuccess(Channel channel, ChannelFuture future) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully send data to '{}'", TransportUtil.remoteAddress(channel)); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index 5e51320ad..dd9b1833f 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -18,8 +18,9 @@ package org.apache.hugegraph.computer.core.receiver; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.common.Constants; @@ -60,7 +61,10 @@ public class MessageRecvManager implements Manager, MessageHandler { private int workerCount; private int expectedFinishMessages; - private CountDownLatch finishMessagesLatch; + private CompletableFuture finishAllMessagesFuture; + private CompletableFuture[] finishMessageFutures; + private AtomicInteger finishMessageCount; + private long waitFinishMessagesTimeout; private long superstep; @@ -90,8 +94,9 @@ public void init(Config config) { this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; - this.finishMessagesLatch = new CountDownLatch( - this.expectedFinishMessages); + + initFinishMessageFuture(); + this.waitFinishMessagesTimeout = config.get( ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT); } @@ -103,8 +108,9 @@ public void beforeSuperstep(Config config, int superstep) { this.messagePartitions = new ComputeMessageRecvPartitions( this.context, fileGenerator, this.sortManager); this.expectedFinishMessages = this.workerCount; - this.finishMessagesLatch = new CountDownLatch( - this.expectedFinishMessages); + + initFinishMessageFuture(); + this.superstep = superstep; if (this.superstep == Constants.INPUT_SUPERSTEP + 1) { @@ -138,35 +144,37 @@ public void onChannelInactive(ConnectionId connectionId) { @Override public void exceptionCaught(TransportException cause, ConnectionId connectionId) { - // TODO: implement failover - LOG.warn("Exception caught for connection:{}, root cause:", + LOG.error("Exception caught for connection:{}, root cause:", connectionId, cause); + if (this.finishAllMessagesFuture != null) { + this.finishAllMessagesFuture.completeExceptionally(cause); + } + } + + private void initFinishMessageFuture() { + this.finishMessageFutures = new CompletableFuture[this.expectedFinishMessages]; + for (int i = 0; i < this.expectedFinishMessages; i++) { + this.finishMessageFutures[i] = new CompletableFuture<>(); + } + this.finishAllMessagesFuture = CompletableFuture.allOf(finishMessageFutures); + this.finishMessageCount = new AtomicInteger(0); } public void waitReceivedAllMessages() { - try { - boolean status = this.finishMessagesLatch.await( - this.waitFinishMessagesTimeout, + if (finishAllMessagesFuture != null) { + try { + finishAllMessagesFuture.get( + this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS); - if (!status) { + } catch (Exception e) { throw new ComputerException( - "Expect %s finish-messages received in %s ms, " + - "%s absence in superstep %s", - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.finishMessagesLatch.getCount(), - this.superstep); + "Thread is interrupted while waiting %s " + + "finish-messages received in %s ms in superstep %s", + e, + this.expectedFinishMessages, + this.waitFinishMessagesTimeout, + this.superstep); } - } catch (InterruptedException e) { - throw new ComputerException( - "Thread is interrupted while waiting %s " + - "finish-messages received in %s ms, " + - "%s absence in superstep %s", - e, - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.finishMessagesLatch.getCount(), - this.superstep); } } @@ -214,7 +222,8 @@ public void onStarted(ConnectionId connectionId) { @Override public void onFinished(ConnectionId connectionId) { LOG.debug("ConnectionId {} finished", connectionId); - this.finishMessagesLatch.countDown(); + int messageIdx = this.finishMessageCount.getAndIncrement(); + this.finishMessageFutures[messageIdx].complete(true); } /** diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java index ee981c025..48fa687fd 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java @@ -154,7 +154,7 @@ public void startSend(MessageType type) { } /** - * Finsih send message, send the last buffer and put an END signal + * Finish send message, send the last buffer and put an END signal * into queue * @param type the message type */ @@ -277,10 +277,10 @@ private void sendControlMessageToWorkers(Set workerIds, } } catch (TimeoutException e) { throw new ComputerException("Timeout(%sms) to wait for " + - "controling message(%s) to finished", + "controlling message(%s) to finished", e, timeout, type); } catch (InterruptedException | ExecutionException e) { - throw new ComputerException("Failed to wait for controling " + + throw new ComputerException("Failed to wait for controlling " + "message(%s) to finished", e, type); } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java index 864951e82..bfdd78844 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java @@ -19,6 +19,8 @@ import java.util.concurrent.CompletableFuture; +import org.apache.hugegraph.computer.core.common.exception.TransportException; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.message.MessageType; public interface MessageSender { @@ -37,4 +39,10 @@ CompletableFuture send(int workerId, MessageType type) * @param message message payload */ void send(int workerId, QueuedMessage message) throws InterruptedException; + + /** + * Invoked when the channel associated with the given connectionId has + * an exception is thrown processing message. + */ + void exceptionCaught(TransportException cause, ConnectionId connectionId); } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java index 810d3b9a6..f2ad12a69 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.computer.core.common.exception.TransportException; import org.apache.hugegraph.computer.core.config.ComputerOptions; import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.TransportClient; import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.concurrent.BarrierEvent; @@ -103,6 +104,15 @@ public void send(int workerId, QueuedMessage message) channel.queue.put(message); } + @Override + public void exceptionCaught(TransportException cause, ConnectionId connectionId) { + for (WorkerChannel channel : this.channels) { + if (channel.client.connectionId().equals(connectionId)) { + channel.futureRef.get().completeExceptionally(cause); + } + } + } + public Runnable notBusyNotifier() { /* * DataClientHandler.sendAvailable() will call it when client diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java index 3535fd8a7..09f866f66 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java @@ -19,6 +19,8 @@ import java.util.concurrent.CompletableFuture; +import org.apache.hugegraph.computer.core.common.exception.TransportException; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.sender.MessageSender; import org.apache.hugegraph.computer.core.sender.QueuedMessage; @@ -36,4 +38,9 @@ public CompletableFuture send(int workerId, MessageType type) { public void send(int workerId, QueuedMessage message) { // pass } + + @Override + public void exceptionCaught(TransportException cause, ConnectionId connectionId) { + // pass + } } From eebe859ef3a5798d0b5e4e2ce71e9d69edba2a9d Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 23 May 2023 17:20:38 +0800 Subject: [PATCH 2/6] remove future array --- .../core/network/DataClientManager.java | 4 +-- .../core/receiver/MessageRecvManager.java | 36 ++++++++----------- .../computer/core/sender/MessageSender.java | 2 +- .../core/sender/QueuedMessageSender.java | 2 +- .../core/compute/MockMessageSender.java | 2 +- 5 files changed, 20 insertions(+), 26 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java index 1bfe8c90b..8e90c7946 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java @@ -118,9 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) { @Override public void exceptionCaught(TransportException cause, ConnectionId connectionId) { - LOG.error("Channel for connectionId {} occurred exception, shutdown all clients", + LOG.error("Channel for connectionId {} occurred exception", connectionId, cause); - DataClientManager.this.sender.exceptionCaught(cause, connectionId); + DataClientManager.this.sender.transportExceptionCaught(cause, connectionId); } } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index dd9b1833f..44db49666 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -61,8 +61,7 @@ public class MessageRecvManager implements Manager, MessageHandler { private int workerCount; private int expectedFinishMessages; - private CompletableFuture finishAllMessagesFuture; - private CompletableFuture[] finishMessageFutures; + private CompletableFuture finishMessagesFuture; private AtomicInteger finishMessageCount; private long waitFinishMessagesTimeout; @@ -75,6 +74,7 @@ public MessageRecvManager(ComputerContext context, this.fileManager = fileManager; this.sortManager = sortManager; this.superstep = Constants.INPUT_SUPERSTEP; + this.finishMessageCount = new AtomicInteger(); } @Override @@ -94,8 +94,8 @@ public void init(Config config) { this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; - - initFinishMessageFuture(); + this.finishMessagesFuture = new CompletableFuture<>(); + this.finishMessageCount.set(this.expectedFinishMessages); this.waitFinishMessagesTimeout = config.get( ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT); @@ -108,8 +108,11 @@ public void beforeSuperstep(Config config, int superstep) { this.messagePartitions = new ComputeMessageRecvPartitions( this.context, fileGenerator, this.sortManager); this.expectedFinishMessages = this.workerCount; + this.finishMessagesFuture = new CompletableFuture<>(); + if (!this.finishMessageCount.compareAndSet(0, this.expectedFinishMessages)) { + throw new ComputerException("The origin count must be 0"); + } - initFinishMessageFuture(); this.superstep = superstep; @@ -146,24 +149,13 @@ public void exceptionCaught(TransportException cause, ConnectionId connectionId) { LOG.error("Exception caught for connection:{}, root cause:", connectionId, cause); - if (this.finishAllMessagesFuture != null) { - this.finishAllMessagesFuture.completeExceptionally(cause); - } - } - - private void initFinishMessageFuture() { - this.finishMessageFutures = new CompletableFuture[this.expectedFinishMessages]; - for (int i = 0; i < this.expectedFinishMessages; i++) { - this.finishMessageFutures[i] = new CompletableFuture<>(); - } - this.finishAllMessagesFuture = CompletableFuture.allOf(finishMessageFutures); - this.finishMessageCount = new AtomicInteger(0); + this.finishMessagesFuture.completeExceptionally(cause); } public void waitReceivedAllMessages() { - if (finishAllMessagesFuture != null) { + if (finishMessagesFuture != null) { try { - finishAllMessagesFuture.get( + finishMessagesFuture.get( this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { @@ -222,8 +214,10 @@ public void onStarted(ConnectionId connectionId) { @Override public void onFinished(ConnectionId connectionId) { LOG.debug("ConnectionId {} finished", connectionId); - int messageIdx = this.finishMessageCount.getAndIncrement(); - this.finishMessageFutures[messageIdx].complete(true); + int messageIdx = this.finishMessageCount.decrementAndGet(); + if (messageIdx == 0) { + this.finishMessagesFuture.complete(null); + } } /** diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java index bfdd78844..a700b22c9 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java @@ -44,5 +44,5 @@ CompletableFuture send(int workerId, MessageType type) * Invoked when the channel associated with the given connectionId has * an exception is thrown processing message. */ - void exceptionCaught(TransportException cause, ConnectionId connectionId); + void transportExceptionCaught(TransportException cause, ConnectionId connectionId); } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java index f2ad12a69..b2006b886 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java @@ -105,7 +105,7 @@ public void send(int workerId, QueuedMessage message) } @Override - public void exceptionCaught(TransportException cause, ConnectionId connectionId) { + public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) { for (WorkerChannel channel : this.channels) { if (channel.client.connectionId().equals(connectionId)) { channel.futureRef.get().completeExceptionally(cause); diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java index 09f866f66..a19692008 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java @@ -40,7 +40,7 @@ public void send(int workerId, QueuedMessage message) { } @Override - public void exceptionCaught(TransportException cause, ConnectionId connectionId) { + public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) { // pass } } From d53a67f429687eadd42e0b44ed3f804332ecf374 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 23 May 2023 17:31:18 +0800 Subject: [PATCH 3/6] remove judge non-null --- .../core/receiver/MessageRecvManager.java | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index 44db49666..e7a75d015 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -62,7 +62,7 @@ public class MessageRecvManager implements Manager, MessageHandler { private int workerCount; private int expectedFinishMessages; private CompletableFuture finishMessagesFuture; - private AtomicInteger finishMessageCount; + private AtomicInteger finishMessagesCount; private long waitFinishMessagesTimeout; private long superstep; @@ -74,7 +74,7 @@ public MessageRecvManager(ComputerContext context, this.fileManager = fileManager; this.sortManager = sortManager; this.superstep = Constants.INPUT_SUPERSTEP; - this.finishMessageCount = new AtomicInteger(); + this.finishMessagesCount = new AtomicInteger(); } @Override @@ -95,7 +95,7 @@ public void init(Config config) { // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; this.finishMessagesFuture = new CompletableFuture<>(); - this.finishMessageCount.set(this.expectedFinishMessages); + this.finishMessagesCount.set(this.expectedFinishMessages); this.waitFinishMessagesTimeout = config.get( ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT); @@ -109,11 +109,10 @@ public void beforeSuperstep(Config config, int superstep) { this.context, fileGenerator, this.sortManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); - if (!this.finishMessageCount.compareAndSet(0, this.expectedFinishMessages)) { + if (!this.finishMessagesCount.compareAndSet(0, this.expectedFinishMessages)) { throw new ComputerException("The origin count must be 0"); } - this.superstep = superstep; if (this.superstep == Constants.INPUT_SUPERSTEP + 1) { @@ -153,20 +152,18 @@ public void exceptionCaught(TransportException cause, } public void waitReceivedAllMessages() { - if (finishMessagesFuture != null) { - try { - finishMessagesFuture.get( - this.waitFinishMessagesTimeout, - TimeUnit.MILLISECONDS); - } catch (Exception e) { - throw new ComputerException( - "Thread is interrupted while waiting %s " + - "finish-messages received in %s ms in superstep %s", - e, - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.superstep); - } + try { + finishMessagesFuture.get( + this.waitFinishMessagesTimeout, + TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new ComputerException( + "Thread is interrupted while waiting %s " + + "finish-messages received in %s ms in superstep %s", + e, + this.expectedFinishMessages, + this.waitFinishMessagesTimeout, + this.superstep); } } @@ -214,7 +211,7 @@ public void onStarted(ConnectionId connectionId) { @Override public void onFinished(ConnectionId connectionId) { LOG.debug("ConnectionId {} finished", connectionId); - int messageIdx = this.finishMessageCount.decrementAndGet(); + int messageIdx = this.finishMessagesCount.decrementAndGet(); if (messageIdx == 0) { this.finishMessagesFuture.complete(null); } From fc30ecb75ce881dce541a9de78fd7b8bf2b36e1d Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 23 May 2023 21:47:21 +0800 Subject: [PATCH 4/6] improve: align error message & catch timeout exception individually --- .../core/receiver/MessageRecvManager.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index e7a75d015..b3f44a82c 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -18,8 +18,10 @@ package org.apache.hugegraph.computer.core.receiver; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hugegraph.computer.core.common.ComputerContext; @@ -109,9 +111,7 @@ public void beforeSuperstep(Config config, int superstep) { this.context, fileGenerator, this.sortManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); - if (!this.finishMessagesCount.compareAndSet(0, this.expectedFinishMessages)) { - throw new ComputerException("The origin count must be 0"); - } + this.finishMessagesCount.set(this.expectedFinishMessages); this.superstep = superstep; @@ -153,17 +153,13 @@ public void exceptionCaught(TransportException cause, public void waitReceivedAllMessages() { try { - finishMessagesFuture.get( - this.waitFinishMessagesTimeout, - TimeUnit.MILLISECONDS); - } catch (Exception e) { - throw new ComputerException( - "Thread is interrupted while waiting %s " + - "finish-messages received in %s ms in superstep %s", - e, - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.superstep); + this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new ComputerException("Time out while waiting %s finish-messages received in %s ms in superstep %s", + this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); + } catch (InterruptedException | ExecutionException e) { + throw new ComputerException("Error while waiting %s finish-messages received in %s ms in superstep %s", + this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); } } @@ -214,6 +210,7 @@ public void onFinished(ConnectionId connectionId) { int messageIdx = this.finishMessagesCount.decrementAndGet(); if (messageIdx == 0) { this.finishMessagesFuture.complete(null); + this.finishMessagesCount.set(0); } } From b15b31db8962c02309eb80c2098462a409e61ba5 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 23 May 2023 22:38:11 +0800 Subject: [PATCH 5/6] fix style --- .../computer/core/receiver/MessageRecvManager.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index b3f44a82c..61b4061c9 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -155,10 +155,12 @@ public void waitReceivedAllMessages() { try { this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - throw new ComputerException("Time out while waiting %s finish-messages received in %s ms in superstep %s", + throw new ComputerException("Time out while waiting %s finish-messages " + + "received in %s ms in superstep %s", this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); } catch (InterruptedException | ExecutionException e) { - throw new ComputerException("Error while waiting %s finish-messages received in %s ms in superstep %s", + throw new ComputerException("Error while waiting %s finish-messages " + + "received in %s ms in superstep %s", this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); } } @@ -207,10 +209,9 @@ public void onStarted(ConnectionId connectionId) { @Override public void onFinished(ConnectionId connectionId) { LOG.debug("ConnectionId {} finished", connectionId); - int messageIdx = this.finishMessagesCount.decrementAndGet(); - if (messageIdx == 0) { + int currentCount = this.finishMessagesCount.decrementAndGet(); + if (currentCount == 0) { this.finishMessagesFuture.complete(null); - this.finishMessagesCount.set(0); } } From 26aca448934fd90f5cfd2bcb3a6a75bfdd99b0e7 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Wed, 24 May 2023 22:58:36 +0800 Subject: [PATCH 6/6] ignore waitFinishMessagesTimeout info in error msg --- .../hugegraph/computer/core/receiver/MessageRecvManager.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index 61b4061c9..b77ffa807 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -159,9 +159,8 @@ public void waitReceivedAllMessages() { "received in %s ms in superstep %s", this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); } catch (InterruptedException | ExecutionException e) { - throw new ComputerException("Error while waiting %s finish-messages " + - "received in %s ms in superstep %s", - this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); + throw new ComputerException("Error while waiting %s finish-messages in superstep %s", + this.expectedFinishMessages, this.superstep, e); } }