26
26
import java .util .concurrent .CompletableFuture ;
27
27
import java .util .concurrent .CompletionStage ;
28
28
import java .util .concurrent .TimeUnit ;
29
+ import java .util .concurrent .atomic .AtomicReference ;
29
30
import java .util .function .Consumer ;
30
31
import java .util .stream .Collectors ;
31
32
32
33
import org .springframework .beans .factory .DisposableBean ;
33
34
import org .springframework .beans .factory .InitializingBean ;
35
+ import org .springframework .context .SmartLifecycle ;
34
36
import org .springframework .dao .DataAccessException ;
35
37
import org .springframework .dao .InvalidDataAccessApiUsageException ;
36
38
import org .springframework .data .redis .ExceptionTranslationStrategy ;
116
118
* @author Andrea Como
117
119
* @author Chris Bono
118
120
*/
119
- public class LettuceConnectionFactory
120
- implements InitializingBean , DisposableBean , RedisConnectionFactory , ReactiveRedisConnectionFactory {
121
+ public class LettuceConnectionFactory implements RedisConnectionFactory , ReactiveRedisConnectionFactory ,
122
+ InitializingBean , DisposableBean , SmartLifecycle {
121
123
122
124
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy (
123
125
LettuceExceptionConverter .INSTANCE );
@@ -144,8 +146,11 @@ public class LettuceConnectionFactory
144
146
145
147
private @ Nullable ClusterCommandExecutor clusterCommandExecutor ;
146
148
147
- private boolean initialized ;
148
- private boolean destroyed ;
149
+ enum State {
150
+ CREATED , STARTING , STARTED , STOPPING , STOPPED , DESTROYED ;
151
+ }
152
+
153
+ private AtomicReference <State > state = new AtomicReference <>(State .CREATED );
149
154
150
155
/**
151
156
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
@@ -333,33 +338,78 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
333
338
return LettuceConverters .createRedisStandaloneConfiguration (redisUri );
334
339
}
335
340
336
- public void afterPropertiesSet () {
341
+ @ Override
342
+ public void start () {
343
+
344
+ State current = state .getAndUpdate (state -> {
345
+ if (State .CREATED .equals (state ) || State .STOPPED .equals (state )) {
346
+ return State .STARTING ;
347
+ }
348
+ return state ;
349
+ });
337
350
338
- this . client = createClient ();
351
+ if ( State . CREATED . equals ( current ) || State . STOPPED . equals ( current )) {
339
352
340
- this .connectionProvider = new ExceptionTranslatingConnectionProvider (createConnectionProvider (client , CODEC ));
341
- this .reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider (
342
- createConnectionProvider (client , LettuceReactiveRedisConnection .CODEC ));
353
+ this .client = createClient ();
343
354
344
- if (isClusterAware ()) {
355
+ this .connectionProvider = new ExceptionTranslatingConnectionProvider (createConnectionProvider (client , CODEC ));
356
+ this .reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider (
357
+ createConnectionProvider (client , LettuceReactiveRedisConnection .CODEC ));
358
+
359
+ if (isClusterAware ()) {
360
+
361
+ this .clusterCommandExecutor = new ClusterCommandExecutor (
362
+ new LettuceClusterTopologyProvider ((RedisClusterClient ) client ),
363
+ new LettuceClusterConnection .LettuceClusterNodeResourceProvider (this .connectionProvider ),
364
+ EXCEPTION_TRANSLATION );
365
+ }
366
+
367
+ state .set (State .STARTED );
345
368
346
- this .clusterCommandExecutor = new ClusterCommandExecutor (
347
- new LettuceClusterTopologyProvider ((RedisClusterClient ) client ),
348
- new LettuceClusterConnection .LettuceClusterNodeResourceProvider (this .connectionProvider ),
349
- EXCEPTION_TRANSLATION );
369
+ if (getEagerInitialization () && getShareNativeConnection ()) {
370
+ initConnection ();
371
+ }
350
372
}
373
+ }
374
+
375
+ @ Override
376
+ public void stop () {
351
377
352
- this .initialized = true ;
378
+ if (state .compareAndSet (State .STARTED , State .STOPPING )) {
379
+ resetConnection ();
380
+ dispose (connectionProvider );
381
+ dispose (reactiveConnectionProvider );
382
+ try {
383
+ Duration quietPeriod = clientConfiguration .getShutdownQuietPeriod ();
384
+ Duration timeout = clientConfiguration .getShutdownTimeout ();
385
+ client .shutdown (quietPeriod .toMillis (), timeout .toMillis (), TimeUnit .MILLISECONDS );
386
+ state .set (State .STOPPED );
387
+ } catch (Exception e ) {
353
388
354
- if (getEagerInitialization () && getShareNativeConnection ()) {
355
- initConnection ();
389
+ if (log .isWarnEnabled ()) {
390
+ log .warn ((client != null ? ClassUtils .getShortName (client .getClass ()) : "LettuceClient" )
391
+ + " did not shut down gracefully." , e );
392
+ }
393
+ }
394
+ state .set (State .STOPPED );
356
395
}
357
396
}
358
397
359
- public void destroy () {
398
+ @ Override
399
+ public boolean isRunning () {
400
+ return State .STARTED .equals (state .get ());
401
+ }
360
402
361
- resetConnection ();
403
+ @ Override
404
+ public void afterPropertiesSet () {
405
+ // customization hook. initialization happens in start
406
+ }
362
407
408
+ @ Override
409
+ public void destroy () {
410
+
411
+ stop ();
412
+ client = null ;
363
413
if (clusterCommandExecutor != null ) {
364
414
365
415
try {
@@ -368,23 +418,7 @@ public void destroy() {
368
418
log .warn ("Cannot properly close cluster command executor" , ex );
369
419
}
370
420
}
371
-
372
- dispose (connectionProvider );
373
- dispose (reactiveConnectionProvider );
374
-
375
- try {
376
- Duration quietPeriod = clientConfiguration .getShutdownQuietPeriod ();
377
- Duration timeout = clientConfiguration .getShutdownTimeout ();
378
- client .shutdown (quietPeriod .toMillis (), timeout .toMillis (), TimeUnit .MILLISECONDS );
379
- } catch (Exception e ) {
380
-
381
- if (log .isWarnEnabled ()) {
382
- log .warn ((client != null ? ClassUtils .getShortName (client .getClass ()) : "LettuceClient" )
383
- + " did not shut down gracefully." , e );
384
- }
385
- }
386
-
387
- this .destroyed = true ;
421
+ state .set (State .DESTROYED );
388
422
}
389
423
390
424
private void dispose (LettuceConnectionProvider connectionProvider ) {
@@ -532,8 +566,6 @@ public void initConnection() {
532
566
*/
533
567
public void resetConnection () {
534
568
535
- assertInitialized ();
536
-
537
569
Optionals .toStream (Optional .ofNullable (connection ), Optional .ofNullable (reactiveConnection ))
538
570
.forEach (SharedConnection ::resetConnection );
539
571
@@ -1267,8 +1299,19 @@ private RedisClient createBasicClient() {
1267
1299
}
1268
1300
1269
1301
private void assertInitialized () {
1270
- Assert .state (this .initialized , "LettuceConnectionFactory was not initialized through afterPropertiesSet()" );
1271
- Assert .state (!this .destroyed , "LettuceConnectionFactory was destroyed and cannot be used anymore" );
1302
+
1303
+ State current = state .get ();
1304
+
1305
+ if (State .STARTED .equals (current )) {
1306
+ return ;
1307
+ }
1308
+
1309
+ switch (current ) {
1310
+ case CREATED , STOPPED -> throw new IllegalStateException (String .format ("LettuceConnectionFactory has been %s. Use start() to initialize it" , current ));
1311
+ case DESTROYED -> throw new IllegalStateException (
1312
+ "LettuceConnectionFactory was destroyed and cannot be used anymore" );
1313
+ default -> throw new IllegalStateException (String .format ("LettuceConnectionFactory is %s" , current ));
1314
+ }
1272
1315
}
1273
1316
1274
1317
private static void applyToAll (RedisURI source , Consumer <RedisURI > action ) {
0 commit comments