Skip to content

Commit 9bd8fda

Browse files
committed
Apply additional polishing to ClusterCommandExecutor.
See spring-projects#2518
1 parent 07cebd4 commit 9bd8fda

File tree

1 file changed

+16
-26
lines changed

1 file changed

+16
-26
lines changed

src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.springframework.data.redis.connection;
1717

1818
import java.util.*;
19-
import java.util.Map.Entry;
2019
import java.util.concurrent.Callable;
2120
import java.util.concurrent.ExecutionException;
2221
import java.util.concurrent.Future;
@@ -227,48 +226,39 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
227226

228227
<T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResult<T>>> futures) {
229228

230-
NodeExceptionCollector exceptionCollector = new NodeExceptionCollector();
231229
MultiNodeResult<T> result = new MultiNodeResult<>();
232-
Object placeholder = new Object();
233-
Map<Future<NodeResult<T>>, Object> safeguard = new IdentityHashMap<>();
230+
NodeExceptionCollector exceptionCollector = new NodeExceptionCollector();
231+
232+
OUT: while (!futures.isEmpty()) {
234233

235-
for (;;) {
234+
Iterator<Map.Entry<NodeExecution, Future<NodeResult<T>>>> entryIterator = futures.entrySet().iterator();
236235

237-
boolean timeout = false;
238-
for (Map.Entry<NodeExecution, Future<NodeResult<T>>> entry : futures.entrySet()) {
236+
while (entryIterator.hasNext()) {
239237

238+
Map.Entry<NodeExecution, Future<NodeResult<T>>> entry = entryIterator.next();
240239
NodeExecution nodeExecution = entry.getKey();
241240
Future<NodeResult<T>> futureNodeResult = entry.getValue();
242241

243242
try {
243+
NodeResult<T> nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS);
244244

245-
if (!safeguard.containsKey(futureNodeResult)) {
246-
247-
NodeResult<T> nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS);
248-
249-
if (nodeExecution.isPositional()) {
250-
result.add(nodeExecution.getPositionalKey(), nodeResult);
251-
} else {
252-
result.add(nodeResult);
253-
}
254-
255-
safeguard.put(futureNodeResult, placeholder);
245+
if (nodeExecution.isPositional()) {
246+
result.add(nodeExecution.getPositionalKey(), nodeResult);
247+
} else {
248+
result.add(nodeResult);
256249
}
250+
251+
entryIterator.remove();
257252
} catch (ExecutionException exception) {
258-
safeguard.put(futureNodeResult, placeholder);
253+
entryIterator.remove();
259254
exceptionCollector.addException(nodeExecution, exception.getCause());
260255
} catch (TimeoutException ignore) {
261-
timeout = true;
262256
} catch (InterruptedException exception) {
263257
Thread.currentThread().interrupt();
264258
exceptionCollector.addException(nodeExecution, exception);
265-
break;
259+
break OUT;
266260
}
267261
}
268-
269-
if (!timeout) {
270-
break;
271-
}
272262
}
273263

274264
if (exceptionCollector.hasExceptions()) {
@@ -300,7 +290,7 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa
300290

301291
Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();
302292

303-
for (Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {
293+
for (Map.Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {
304294

305295
if (entry.getKey().isMaster()) {
306296
for (PositionalKey key : entry.getValue()) {

0 commit comments

Comments
 (0)