Skip to content

Commit 7c28bd5

Browse files
committed
modify finishSend
1 parent 468412f commit 7c28bd5

File tree

8 files changed

+25
-27
lines changed

8 files changed

+25
-27
lines changed

computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public WorkerStat compute(WorkerContext context, int superstep) {
168168
"partition parallel compute", t);
169169
}
170170

171-
this.sendManager.finishSend(MessageType.MSG);
171+
this.sendManager.finishSend(MessageType.MSG, true);
172172

173173
// After compute and send finish signal.
174174
Map<Integer, MessageStat> recvStats = this.recvManager.messageStats();

computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,15 @@ public void loadGraph() {
8181
Vertex vertex = iterator.next();
8282
this.sendManager.sendVertex(vertex);
8383
}
84-
this.sendManager.finishSend(MessageType.VERTEX);
84+
this.sendManager.finishSend(MessageType.VERTEX, false);
8585

8686
this.sendManager.startSend(MessageType.EDGE);
8787
iterator = this.loadService.createIteratorFromEdge();
8888
while (iterator.hasNext()) {
8989
Vertex vertex = iterator.next();
9090
this.sendManager.sendEdge(vertex);
9191
}
92-
this.sendManager.finishSend(MessageType.EDGE);
92+
this.sendManager.finishSend(MessageType.EDGE, true);
9393
this.sendManager.clearBuffer();
9494
}
9595
}

computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) {
118118
@Override
119119
public void exceptionCaught(TransportException cause,
120120
ConnectionId connectionId) {
121-
LOG.error("Channel for connectionId {} occurred exception, shutdown all clients",
121+
LOG.error("Channel for connectionId {} occurred exception",
122122
connectionId, cause);
123-
DataClientManager.this.sender.exceptionCaught(cause, connectionId);
123+
DataClientManager.this.sender.transportExceptionCaught(cause, connectionId);
124124
}
125125
}
126126
}

computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public class MessageRecvManager implements Manager, MessageHandler {
6363
private int expectedFinishMessages;
6464
private CompletableFuture<Void> finishAllMessagesFuture;
6565
private CompletableFuture<Boolean>[] finishMessageFutures;
66-
private AtomicInteger finishMessageCount;
6766

6867
private long waitFinishMessagesTimeout;
6968
private long superstep;
@@ -92,10 +91,14 @@ public void init(Config config) {
9291
this.edgePartitions = new EdgeMessageRecvPartitions(
9392
this.context, fileGenerator, this.sortManager);
9493
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
95-
// One for vertex and one for edge.
96-
this.expectedFinishMessages = this.workerCount * 2;
9794

98-
initFinishMessageFuture();
95+
this.expectedFinishMessages = this.workerCount;
96+
this.finishMessageFutures = new CompletableFuture[this.expectedFinishMessages];
97+
98+
for (int i = 0; i < this.expectedFinishMessages; i++) {
99+
this.finishMessageFutures[i] = new CompletableFuture<>();
100+
}
101+
this.finishAllMessagesFuture = CompletableFuture.allOf(finishMessageFutures);
99102

100103
this.waitFinishMessagesTimeout = config.get(
101104
ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT);
@@ -107,9 +110,12 @@ public void beforeSuperstep(Config config, int superstep) {
107110
this.fileManager, superstep);
108111
this.messagePartitions = new ComputeMessageRecvPartitions(
109112
this.context, fileGenerator, this.sortManager);
110-
this.expectedFinishMessages = this.workerCount;
111113

112-
initFinishMessageFuture();
114+
for (int i = 0; i < this.expectedFinishMessages; i++) {
115+
this.finishMessageFutures[i] = new CompletableFuture<>();
116+
}
117+
this.finishAllMessagesFuture = CompletableFuture.allOf(finishMessageFutures);
118+
113119

114120
this.superstep = superstep;
115121

@@ -151,15 +157,6 @@ public void exceptionCaught(TransportException cause,
151157
}
152158
}
153159

154-
private void initFinishMessageFuture() {
155-
this.finishMessageFutures = new CompletableFuture[this.expectedFinishMessages];
156-
for (int i = 0; i < this.expectedFinishMessages; i++) {
157-
this.finishMessageFutures[i] = new CompletableFuture<>();
158-
}
159-
this.finishAllMessagesFuture = CompletableFuture.allOf(finishMessageFutures);
160-
this.finishMessageCount = new AtomicInteger(0);
161-
}
162-
163160
public void waitReceivedAllMessages() {
164161
if (finishAllMessagesFuture != null) {
165162
try {
@@ -222,8 +219,7 @@ public void onStarted(ConnectionId connectionId) {
222219
@Override
223220
public void onFinished(ConnectionId connectionId) {
224221
LOG.debug("ConnectionId {} finished", connectionId);
225-
int messageIdx = this.finishMessageCount.getAndIncrement();
226-
this.finishMessageFutures[messageIdx].complete(true);
222+
this.finishMessageFutures[connectionId.clientIndex()].complete(true);
227223
}
228224

229225
/**

computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,16 @@ public void startSend(MessageType type) {
158158
* into queue
159159
* @param type the message type
160160
*/
161-
public void finishSend(MessageType type) {
161+
public void finishSend(MessageType type, boolean sendControlMessage) {
162162
Map<Integer, MessageSendPartition> all = this.buffers.all();
163163
MessageStat stat = this.sortAndSendLastBuffer(all, type);
164164

165165
Set<Integer> workerIds = all.keySet().stream()
166166
.map(this.partitioner::workerId)
167167
.collect(Collectors.toSet());
168-
this.sendControlMessageToWorkers(workerIds, MessageType.FINISH);
168+
if (sendControlMessage == true) {
169+
this.sendControlMessageToWorkers(workerIds, MessageType.FINISH);
170+
}
169171
LOG.info("Finish sending message(type={},count={},bytes={})",
170172
type, stat.messageCount(), stat.messageBytes());
171173
}

computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ CompletableFuture<Void> send(int workerId, MessageType type)
4444
* Invoked when the channel associated with the given connectionId has
4545
* an exception is thrown processing message.
4646
*/
47-
void exceptionCaught(TransportException cause, ConnectionId connectionId);
47+
void transportExceptionCaught(TransportException cause, ConnectionId connectionId);
4848
}

computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void send(int workerId, QueuedMessage message)
105105
}
106106

107107
@Override
108-
public void exceptionCaught(TransportException cause, ConnectionId connectionId) {
108+
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
109109
for (WorkerChannel channel : this.channels) {
110110
if (channel.client.connectionId().equals(connectionId)) {
111111
channel.futureRef.get().completeExceptionally(cause);

computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void send(int workerId, QueuedMessage message) {
4040
}
4141

4242
@Override
43-
public void exceptionCaught(TransportException cause, ConnectionId connectionId) {
43+
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
4444
// pass
4545
}
4646
}

0 commit comments

Comments
 (0)