22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
@@ -63,17 +62,19 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
63
62
*/
64
63
protected TarantoolClientStats stats ;
65
64
protected StateHelper state = new StateHelper (StateHelper .RECONNECT );
66
- protected Thread reader ;
67
- protected Thread writer ;
65
+ protected volatile Thread reader ;
66
+ protected volatile Thread writer ;
68
67
69
68
protected Thread connector = new Thread (new Runnable () {
70
69
@ Override
71
70
public void run () {
72
71
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
72
+ reconnect (0 , thumbstone );
73
+ try {
74
+ state .awaitReconnection ();
75
+ } catch (InterruptedException e ) {
76
+ Thread .currentThread ().interrupt ();
75
77
}
76
- LockSupport .park (state );
77
78
}
78
79
}
79
80
});
@@ -139,16 +140,13 @@ protected void reconnect(int retry, Throwable lastError) {
139
140
protected void connect (final SocketChannel channel ) throws Exception {
140
141
try {
141
142
TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
143
+ config .username , config .password );
143
144
this .serverVersion = greeting .getServerVersion ();
144
145
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
146
+ closeChannel (channel );
150
147
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
148
}
149
+
152
150
channel .configureBlocking (false );
153
151
this .channel = channel ;
154
152
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -174,8 +172,21 @@ public void run() {
174
172
readThread ();
175
173
} finally {
176
174
state .release (StateHelper .READING );
177
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
178
- LockSupport .unpark (connector );
175
+ // Skip the old gen. attempt to reconnect
176
+ //
177
+ // there're two cases when a read thread is here
178
+ // 1. it's a new generation thread inside/outside
179
+ // a reconnection process (currentThread == reader)
180
+ // 2. It's an old generation thread inside
181
+ // a reconnection process (currentThread != reader)
182
+ // NOTE: reader thread checks the statuses in reverse
183
+ // order the connector thread changes them. It
184
+ // makes this thread be sure that reference to
185
+ // it is not outdated when the state is UNINITIALIZED
186
+ if (state .getState () == StateHelper .UNINITIALIZED
187
+ && Thread .currentThread () == reader ) {
188
+ state .trySignalForReconnection ();
189
+ }
179
190
}
180
191
}
181
192
}
@@ -189,13 +200,33 @@ public void run() {
189
200
writeThread ();
190
201
} finally {
191
202
state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
203
+ // Skip the old gen. attempt to reconnect
204
+ //
205
+ // there're two cases when a write thread is here
206
+ // 1. it's a new generation thread inside/outside
207
+ // a reconnection process (currentThread == writer)
208
+ // 2. It's an old generation thread inside
209
+ // a reconnection process (currentThread != writer)
210
+ // NOTE: writer thread checks the statuses in reverse
211
+ // order the connector thread changes them. It
212
+ // makes this thread be sure that reference to
213
+ // it is not outdated when the state is UNINITIALIZED
214
+ if (state .getState () == StateHelper .UNINITIALIZED
215
+ && Thread .currentThread () == writer ) {
216
+ state .trySignalForReconnection ();
217
+ }
194
218
}
195
219
}
196
220
}
197
221
});
198
222
223
+ // reconnection preparation is done
224
+ // before reconnection the state will be released
225
+ // reader/writer threads have been replaced by new ones
226
+ // it's required to be sure that old r/w threads see correct
227
+ // client's r/w references
228
+ state .release (StateHelper .RECONNECT );
229
+
199
230
configureThreads (threadName );
200
231
reader .start ();
201
232
writer .start ();
@@ -337,25 +368,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
368
}
338
369
339
370
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
371
+ while (!Thread .currentThread ().isInterrupted ()) {
372
+ try {
373
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
374
345
- Map <Integer , Object > headers = packet .getHeaders ();
375
+ Map <Integer , Object > headers = packet .getHeaders ();
346
376
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- TarantoolOp <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
377
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
378
+ TarantoolOp <?> future = futures .remove (syncId );
379
+ stats .received ++;
380
+ wait .decrementAndGet ();
381
+ complete (packet , future );
382
+ } catch (Exception e ) {
383
+ die ("Cant read answer" , e );
384
+ return ;
356
385
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
386
}
360
387
}
361
388
@@ -498,7 +525,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
498
525
499
526
@ Override
500
527
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
501
- return (TarantoolClientOps )this ;
528
+ return (TarantoolClientOps ) this ;
502
529
}
503
530
504
531
@ Override
@@ -514,7 +541,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
514
541
515
542
@ Override
516
543
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
517
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
544
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
518
545
519
546
@ Override
520
547
public Long update (String sql , Object ... bind ) {
@@ -530,7 +557,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
530
557
531
558
@ Override
532
559
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
533
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
560
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
534
561
@ Override
535
562
public Future <Long > update (String sql , Object ... bind ) {
536
563
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -618,6 +645,7 @@ public TarantoolClientStats getStats() {
618
645
* Manages state changes.
619
646
*/
620
647
protected final class StateHelper {
648
+ static final int UNINITIALIZED = 0 ;
621
649
static final int READING = 1 ;
622
650
static final int WRITING = 2 ;
623
651
static final int ALIVE = READING | WRITING ;
@@ -627,10 +655,22 @@ protected final class StateHelper {
627
655
private final AtomicInteger state ;
628
656
629
657
private final AtomicReference <CountDownLatch > nextAliveLatch =
630
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
658
+ new AtomicReference <>(new CountDownLatch (1 ));
631
659
632
660
private final CountDownLatch closedLatch = new CountDownLatch (1 );
633
661
662
+ /**
663
+ * The condition variable to signal a reconnection is needed from reader /
664
+ * writer threads and waiting for that signal from the reconnection thread.
665
+ *
666
+ * The lock variable to access this condition.
667
+ *
668
+ * @see #awaitReconnection()
669
+ * @see #trySignalForReconnection()
670
+ */
671
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
672
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
673
+
634
674
protected StateHelper (int state ) {
635
675
this .state = new AtomicInteger (state );
636
676
}
@@ -639,35 +679,60 @@ protected int getState() {
639
679
return state .get ();
640
680
}
641
681
682
+ /**
683
+ * Set CLOSED state, drop RECONNECT state.
684
+ */
642
685
protected boolean close () {
643
- for (;; ) {
686
+ for (; ; ) {
644
687
int st = getState ();
688
+
689
+ /* CLOSED is the terminal state. */
645
690
if ((st & CLOSED ) == CLOSED )
646
691
return false ;
692
+
693
+ /* Drop RECONNECT, set CLOSED. */
647
694
if (compareAndSet (st , (st & ~RECONNECT ) | CLOSED ))
648
695
return true ;
649
696
}
650
697
}
651
698
699
+ /**
700
+ * Move from a current state to a give one.
701
+ *
702
+ * Some moves are forbidden.
703
+ */
652
704
protected boolean acquire (int mask ) {
653
- for (;;) {
654
- int st = getState ();
655
- if ((st & CLOSED ) == CLOSED )
705
+ for (; ; ) {
706
+ int currentState = getState ();
707
+
708
+ /* CLOSED is the terminal state. */
709
+ if ((currentState & CLOSED ) == CLOSED ) {
710
+ return false ;
711
+ }
712
+
713
+ /* Don't move to READING, WRITING or ALIVE from RECONNECT. */
714
+ if ((currentState & RECONNECT ) > mask ) {
656
715
return false ;
716
+ }
657
717
658
- if ((st & mask ) != 0 )
718
+ /* Cannot move from a state to the same state. */
719
+ if ((currentState & mask ) != 0 ) {
659
720
throw new IllegalStateException ("State is already " + mask );
721
+ }
660
722
661
- if (compareAndSet (st , st | mask ))
723
+ /* Set acquired state. */
724
+ if (compareAndSet (currentState , currentState | mask )) {
662
725
return true ;
726
+ }
663
727
}
664
728
}
665
729
666
730
protected void release (int mask ) {
667
- for (;; ) {
731
+ for (; ; ) {
668
732
int st = getState ();
669
- if (compareAndSet (st , st & ~mask ))
733
+ if (compareAndSet (st , st & ~mask )) {
670
734
return ;
735
+ }
671
736
}
672
737
}
673
738
@@ -686,10 +751,18 @@ protected boolean compareAndSet(int expect, int update) {
686
751
return true ;
687
752
}
688
753
754
+ /**
755
+ * Reconnection uses another way to await state via receiving a signal
756
+ * instead of latches.
757
+ */
689
758
protected void awaitState (int state ) throws InterruptedException {
690
- CountDownLatch latch = getStateLatch (state );
691
- if (latch != null ) {
692
- latch .await ();
759
+ if (state == RECONNECT ) {
760
+ awaitReconnection ();
761
+ } else {
762
+ CountDownLatch latch = getStateLatch (state );
763
+ if (latch != null ) {
764
+ latch .await ();
765
+ }
693
766
}
694
767
}
695
768
@@ -709,10 +782,42 @@ private CountDownLatch getStateLatch(int state) {
709
782
CountDownLatch latch = nextAliveLatch .get ();
710
783
/* It may happen so that an error is detected but the state is still alive.
711
784
Wait for the 'next' alive state in such cases. */
712
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
785
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
713
786
}
714
787
return null ;
715
788
}
789
+
790
+ /**
791
+ * Blocks until a reconnection signal will be received.
792
+ *
793
+ * @see #trySignalForReconnection()
794
+ */
795
+ private void awaitReconnection () throws InterruptedException {
796
+ connectorLock .lock ();
797
+ try {
798
+ while (getState () != StateHelper .RECONNECT ) {
799
+ reconnectRequired .await ();
800
+ }
801
+ } finally {
802
+ connectorLock .unlock ();
803
+ }
804
+ }
805
+
806
+ /**
807
+ * Signals to the connector that reconnection process can be performed.
808
+ *
809
+ * @see #awaitReconnection()
810
+ */
811
+ private void trySignalForReconnection () {
812
+ if (compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
813
+ connectorLock .lock ();
814
+ try {
815
+ reconnectRequired .signal ();
816
+ } finally {
817
+ connectorLock .unlock ();
818
+ }
819
+ }
820
+ }
716
821
}
717
822
718
823
protected static class TarantoolOp <V > extends CompletableFuture <V > {
0 commit comments