15
15
*/
16
16
package org .springframework .data .redis .connection ;
17
17
18
- import java .util .ArrayList ;
19
- import java .util .Arrays ;
20
- import java .util .Collection ;
21
- import java .util .Collections ;
22
- import java .util .Comparator ;
23
- import java .util .HashMap ;
24
- import java .util .HashSet ;
25
- import java .util .Iterator ;
26
- import java .util .LinkedHashMap ;
27
- import java .util .List ;
28
- import java .util .Map ;
18
+ import java .util .*;
29
19
import java .util .Map .Entry ;
30
- import java .util .Objects ;
31
- import java .util .Random ;
32
- import java .util .Set ;
33
- import java .util .TreeMap ;
34
20
import java .util .concurrent .Callable ;
35
21
import java .util .concurrent .ExecutionException ;
36
22
import java .util .concurrent .Future ;
37
23
import java .util .concurrent .TimeUnit ;
38
24
import java .util .concurrent .TimeoutException ;
39
- import java .util .function .BiConsumer ;
40
25
import java .util .function .Function ;
41
26
import java .util .stream .Collectors ;
42
27
@@ -111,47 +96,46 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
111
96
/**
112
97
* Run {@link ClusterCommandCallback} on a random node.
113
98
*
114
- * @param clusterCommand must not be {@literal null}.
99
+ * @param commandCallback must not be {@literal null}.
115
100
* @return never {@literal null}.
116
101
*/
117
- public <T > NodeResult <T > executeCommandOnArbitraryNode (ClusterCommandCallback <?, T > clusterCommand ) {
102
+ public <T > NodeResult <T > executeCommandOnArbitraryNode (ClusterCommandCallback <?, T > commandCallback ) {
118
103
119
- Assert .notNull (clusterCommand , "ClusterCommandCallback must not be null" );
104
+ Assert .notNull (commandCallback , "ClusterCommandCallback must not be null" );
120
105
121
106
List <RedisClusterNode > nodes = new ArrayList <>(getClusterTopology ().getActiveNodes ());
122
107
123
108
RedisClusterNode arbitraryNode = nodes .get (new Random ().nextInt (nodes .size ()));
124
109
125
- return executeCommandOnSingleNode (clusterCommand , arbitraryNode );
110
+ return executeCommandOnSingleNode (commandCallback , arbitraryNode );
126
111
}
127
112
128
113
/**
129
114
* Run {@link ClusterCommandCallback} on given {@link RedisClusterNode}.
130
115
*
131
- * @param clusterCommand must not be {@literal null}.
116
+ * @param commandCallback must not be {@literal null}.
132
117
* @param node must not be {@literal null}.
133
118
* @return the {@link NodeResult} from the single, targeted {@link RedisClusterNode}.
134
119
* @throws IllegalArgumentException in case no resource can be acquired for given node.
135
120
*/
136
- public <S , T > NodeResult <T > executeCommandOnSingleNode (ClusterCommandCallback <S , T > clusterCommand ,
121
+ public <S , T > NodeResult <T > executeCommandOnSingleNode (ClusterCommandCallback <S , T > commandCallback ,
137
122
RedisClusterNode node ) {
138
123
139
- return executeCommandOnSingleNode (clusterCommand , node , 0 );
124
+ return executeCommandOnSingleNode (commandCallback , node , 0 );
140
125
}
141
126
142
- private <S , T > NodeResult <T > executeCommandOnSingleNode (ClusterCommandCallback <S , T > clusterCommand ,
127
+ private <S , T > NodeResult <T > executeCommandOnSingleNode (ClusterCommandCallback <S , T > commandCallback ,
143
128
RedisClusterNode node , int redirectCount ) {
144
129
145
- Assert .notNull (clusterCommand , "ClusterCommandCallback must not be null" );
130
+ Assert .notNull (commandCallback , "ClusterCommandCallback must not be null" );
146
131
Assert .notNull (node , "RedisClusterNode must not be null" );
147
132
148
133
if (redirectCount > this .maxRedirects ) {
149
134
150
- String message = String .format ("Cannot follow Cluster Redirects over more than %s legs;"
151
- + " Please consider increasing the number of redirects to follow; Current value is: %s." ,
152
- redirectCount , this .maxRedirects );
153
-
154
- throw new TooManyClusterRedirectionsException (message );
135
+ throw new TooManyClusterRedirectionsException (String .format (
136
+ "Cannot follow Cluster Redirects over more than %s legs; "
137
+ + "Consider increasing the number of redirects to follow; Current value is: %s." ,
138
+ redirectCount , this .maxRedirects ));
155
139
}
156
140
157
141
RedisClusterNode nodeToUse = lookupNode (node );
@@ -161,15 +145,14 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
161
145
Assert .notNull (client , "Could not acquire resource for node; Is your cluster info up to date" );
162
146
163
147
try {
164
- return new NodeResult <>(node , clusterCommand .doInCluster (client ));
148
+ return new NodeResult <>(node , commandCallback .doInCluster (client ));
165
149
} catch (RuntimeException cause ) {
166
150
167
151
RuntimeException translatedException = convertToDataAccessException (cause );
168
152
169
153
if (translatedException instanceof ClusterRedirectException clusterRedirectException ) {
170
- return executeCommandOnSingleNode (clusterCommand , topologyProvider .getTopology ()
171
- .lookup (clusterRedirectException .getTargetHost (), clusterRedirectException .getTargetPort ()),
172
- redirectCount + 1 );
154
+ return executeCommandOnSingleNode (commandCallback , topologyProvider .getTopology ().lookup (
155
+ clusterRedirectException .getTargetHost (), clusterRedirectException .getTargetPort ()), redirectCount + 1 );
173
156
} else {
174
157
throw translatedException != null ? translatedException : cause ;
175
158
}
@@ -182,7 +165,8 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
182
165
* Lookup {@link RedisClusterNode node} from the {@link ClusterTopology topology}.
183
166
*
184
167
* @param node {@link RedisClusterNode node} to lookup; must not be {@literal null}.
185
- * @return the resolved {@link RedisClusterNode node} from the {@link ClusterTopology topology}; never {@literal null}.
168
+ * @return the resolved {@link RedisClusterNode node} from the {@link ClusterTopology topology}; never
169
+ * {@literal null}.
186
170
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
187
171
*/
188
172
private RedisClusterNode lookupNode (RedisClusterNode node ) {
@@ -197,27 +181,27 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
197
181
/**
198
182
* Run {@link ClusterCommandCallback} on all reachable master nodes.
199
183
*
200
- * @param clusterCommand must not be {@literal null}.
184
+ * @param commandCallback must not be {@literal null}.
201
185
* @return never {@literal null}.
202
186
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
203
187
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
204
188
*/
205
- public <S , T > MultiNodeResult <T > executeCommandOnAllNodes (ClusterCommandCallback <S , T > clusterCommand ) {
206
- return executeCommandAsyncOnNodes (clusterCommand , getClusterTopology ().getActiveMasterNodes ());
189
+ public <S , T > MultiNodeResult <T > executeCommandOnAllNodes (ClusterCommandCallback <S , T > commandCallback ) {
190
+ return executeCommandAsyncOnNodes (commandCallback , getClusterTopology ().getActiveMasterNodes ());
207
191
}
208
192
209
193
/**
210
- * @param clusterCommand must not be {@literal null}.
194
+ * @param commandCallback must not be {@literal null}.
211
195
* @param nodes must not be {@literal null}.
212
196
* @return never {@literal null}.
213
197
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
214
198
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
215
199
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
216
200
*/
217
- public <S , T > MultiNodeResult <T > executeCommandAsyncOnNodes (ClusterCommandCallback <S , T > clusterCommand ,
201
+ public <S , T > MultiNodeResult <T > executeCommandAsyncOnNodes (ClusterCommandCallback <S , T > commandCallback ,
218
202
Iterable <RedisClusterNode > nodes ) {
219
203
220
- Assert .notNull (clusterCommand , "Callback must not be null" );
204
+ Assert .notNull (commandCallback , "Callback must not be null" );
221
205
Assert .notNull (nodes , "Nodes must not be null" );
222
206
223
207
ClusterTopology topology = this .topologyProvider .getTopology ();
@@ -234,7 +218,7 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
234
218
Map <NodeExecution , Future <NodeResult <T >>> futures = new LinkedHashMap <>();
235
219
236
220
for (RedisClusterNode node : resolvedRedisClusterNodes ) {
237
- Callable <NodeResult <T >> nodeCommandExecution = () -> executeCommandOnSingleNode (clusterCommand , node );
221
+ Callable <NodeResult <T >> nodeCommandExecution = () -> executeCommandOnSingleNode (commandCallback , node );
238
222
futures .put (new NodeExecution (node ), executor .submit (nodeCommandExecution ));
239
223
}
240
224
@@ -243,26 +227,22 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
243
227
244
228
<T > MultiNodeResult <T > collectResults (Map <NodeExecution , Future <NodeResult <T >>> futures ) {
245
229
246
- Map < RedisClusterNode , Throwable > exceptions = new HashMap <> ();
230
+ NodeExceptionCollector exceptionCollector = new NodeExceptionCollector ();
247
231
MultiNodeResult <T > result = new MultiNodeResult <>();
248
- Set <String > safeguard = new HashSet <>();
232
+ Object placeholder = new Object ();
233
+ Map <Future <NodeResult <T >>, Object > safeguard = new IdentityHashMap <>();
249
234
250
- BiConsumer <NodeExecution , Throwable > exceptionHandler = getExceptionHandlerFunction (exceptions );
251
-
252
- boolean done = false ;
253
-
254
- while (!done ) {
255
-
256
- done = true ;
235
+ for (;;) {
257
236
237
+ boolean timeout = false ;
258
238
for (Map .Entry <NodeExecution , Future <NodeResult <T >>> entry : futures .entrySet ()) {
259
239
260
240
NodeExecution nodeExecution = entry .getKey ();
261
241
Future <NodeResult <T >> futureNodeResult = entry .getValue ();
262
- String futureId = ObjectUtils .getIdentityHexString (futureNodeResult );
263
242
264
243
try {
265
- if (!safeguard .contains (futureId )) {
244
+
245
+ if (!safeguard .containsKey (futureNodeResult )) {
266
246
267
247
NodeResult <T > nodeResult = futureNodeResult .get (10L , TimeUnit .MICROSECONDS );
268
248
@@ -272,39 +252,32 @@ <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResult<T>>>
272
252
result .add (nodeResult );
273
253
}
274
254
275
- safeguard .add ( futureId );
255
+ safeguard .put ( futureNodeResult , placeholder );
276
256
}
277
257
} catch (ExecutionException exception ) {
278
- safeguard .add ( futureId );
279
- exceptionHandler . accept (nodeExecution , exception .getCause ());
258
+ safeguard .put ( futureNodeResult , placeholder );
259
+ exceptionCollector . addException (nodeExecution , exception .getCause ());
280
260
} catch (TimeoutException ignore ) {
281
- done = false ;
282
- } catch (InterruptedException cause ) {
261
+ timeout = true ;
262
+ } catch (InterruptedException exception ) {
283
263
Thread .currentThread ().interrupt ();
284
- exceptionHandler . accept (nodeExecution , cause );
264
+ exceptionCollector . addException (nodeExecution , exception );
285
265
break ;
286
266
}
287
267
}
268
+
269
+ if (!timeout ) {
270
+ break ;
271
+ }
288
272
}
289
273
290
- if (! exceptions . isEmpty ()) {
291
- throw new ClusterCommandExecutionFailureException (new ArrayList <>( exceptions . values () ));
274
+ if (exceptionCollector . hasExceptions ()) {
275
+ throw new ClusterCommandExecutionFailureException (exceptionCollector . getExceptions ( ));
292
276
}
293
277
294
278
return result ;
295
279
}
296
280
297
- private BiConsumer <NodeExecution , Throwable > getExceptionHandlerFunction (Map <RedisClusterNode , Throwable > exceptions ) {
298
-
299
- return (nodeExecution , throwable ) -> {
300
-
301
- DataAccessException dataAccessException = convertToDataAccessException ((Exception ) throwable );
302
- Throwable resolvedException = dataAccessException != null ? dataAccessException : throwable ;
303
-
304
- exceptions .putIfAbsent (nodeExecution .getNode (), resolvedException );
305
- };
306
- }
307
-
308
281
/**
309
282
* Run {@link MultiKeyClusterCommandCallback} with on a curated set of nodes serving one or more keys.
310
283
*
@@ -331,8 +304,8 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa
331
304
332
305
if (entry .getKey ().isMaster ()) {
333
306
for (PositionalKey key : entry .getValue ()) {
334
- futures .put (new NodeExecution (entry .getKey (), key ), this .executor . submit (() ->
335
- executeMultiKeyCommandOnSingleNode (commandCallback , entry .getKey (), key .getBytes ())));
307
+ futures .put (new NodeExecution (entry .getKey (), key ), this .executor
308
+ . submit (() -> executeMultiKeyCommandOnSingleNode (commandCallback , entry .getKey (), key .getBytes ())));
336
309
}
337
310
}
338
311
}
@@ -458,8 +431,8 @@ boolean isPositional() {
458
431
}
459
432
460
433
/**
461
- * {@link NodeResult} encapsulates the actual {@link T value} returned by a {@link ClusterCommandCallback}
462
- * on a given {@link RedisClusterNode}.
434
+ * {@link NodeResult} encapsulates the actual {@link T value} returned by a {@link ClusterCommandCallback} on a given
435
+ * {@link RedisClusterNode}.
463
436
*
464
437
* @param <T> {@link Class Type} of the {@link Object value} returned in the result.
465
438
* @author Christoph Strobl
@@ -468,9 +441,9 @@ boolean isPositional() {
468
441
*/
469
442
public static class NodeResult <T > {
470
443
471
- private RedisClusterNode node ;
472
- private ByteArrayWrapper key ;
473
- private @ Nullable T value ;
444
+ private final RedisClusterNode node ;
445
+ private final ByteArrayWrapper key ;
446
+ private final @ Nullable T value ;
474
447
475
448
/**
476
449
* Create a new {@link NodeResult}.
@@ -551,9 +524,8 @@ public boolean equals(@Nullable Object obj) {
551
524
return false ;
552
525
}
553
526
554
- return ObjectUtils .nullSafeEquals (this .getNode (), that .getNode ())
555
- && Objects .equals (this .key , that .key )
556
- && Objects .equals (this .getValue (), that .getValue ());
527
+ return ObjectUtils .nullSafeEquals (this .getNode (), that .getNode ()) && Objects .equals (this .key , that .key )
528
+ && Objects .equals (this .getValue (), that .getValue ());
557
529
}
558
530
559
531
@ Override
@@ -757,8 +729,7 @@ public boolean equals(@Nullable Object obj) {
757
729
if (!(obj instanceof PositionalKey that ))
758
730
return false ;
759
731
760
- return this .getPosition () == that .getPosition ()
761
- && ObjectUtils .nullSafeEquals (this .getKey (), that .getKey ());
732
+ return this .getPosition () == that .getPosition () && ObjectUtils .nullSafeEquals (this .getKey (), that .getKey ());
762
733
}
763
734
764
735
@ Override
@@ -836,4 +807,34 @@ public Iterator<PositionalKey> iterator() {
836
807
return this .keys .iterator ();
837
808
}
838
809
}
810
+
811
+ /**
812
+ * Collector for exceptions. Applies translation of exceptions if possible.
813
+ */
814
+ private class NodeExceptionCollector {
815
+
816
+ private final Map <RedisClusterNode , Throwable > exceptions = new HashMap <>();
817
+
818
+ /**
819
+ * @return {@code true} if the collector contains at least one exception.
820
+ */
821
+ public boolean hasExceptions () {
822
+ return !exceptions .isEmpty ();
823
+ }
824
+
825
+ public void addException (NodeExecution execution , Throwable throwable ) {
826
+
827
+ Throwable translated = throwable instanceof Exception e ? convertToDataAccessException (e ) : throwable ;
828
+ Throwable resolvedException = translated != null ? translated : throwable ;
829
+
830
+ exceptions .putIfAbsent (execution .getNode (), resolvedException );
831
+ }
832
+
833
+ /**
834
+ * @return the collected exceptions.
835
+ */
836
+ public List <? extends Throwable > getExceptions () {
837
+ return new ArrayList <>(exceptions .values ());
838
+ }
839
+ }
839
840
}
0 commit comments