8
8
import java .util .Iterator ;
9
9
import java .util .List ;
10
10
import java .util .Map ;
11
+ import java .util .concurrent .CompletableFuture ;
12
+ import java .util .concurrent .CompletionStage ;
11
13
import java .util .concurrent .ConcurrentHashMap ;
12
14
import java .util .concurrent .CountDownLatch ;
13
15
import java .util .concurrent .ExecutionException ;
@@ -31,7 +33,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<List<?>>> implemen
31
33
protected volatile Exception thumbstone ;
32
34
protected volatile CountDownLatch alive ;
33
35
34
- protected Map <Long , FutureImpl <List <?>>> futures ;
36
+ protected Map <Long , CompletableFuture <List <?>>> futures ;
35
37
protected AtomicInteger wait = new AtomicInteger ();
36
38
/**
37
39
* Write properties
@@ -49,6 +51,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<List<?>>> implemen
49
51
*/
50
52
protected SyncOps syncOps ;
51
53
protected FireAndForgetOps fireAndForgetOps ;
54
+ protected ComposableAsyncOps composableAsyncOps ;
52
55
53
56
/**
54
57
* Inner
@@ -77,7 +80,7 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
77
80
this .alive = new CountDownLatch (1 );
78
81
this .socketProvider = socketProvider ;
79
82
this .stats = new TarantoolClientStats ();
80
- this .futures = new ConcurrentHashMap <Long , FutureImpl < List <?>> >(config .predictedFutures );
83
+ this .futures = new ConcurrentHashMap <>(config .predictedFutures );
81
84
this .sharedBuffer = ByteBuffer .allocateDirect (config .sharedBufferSize );
82
85
this .writerBuffer = ByteBuffer .allocateDirect (sharedBuffer .capacity ());
83
86
this .connector .setDaemon (true );
@@ -197,21 +200,22 @@ protected void configureThreads(String threadName) {
197
200
}
198
201
199
202
200
- public Future <List <?>> exec (Code code , Object ... args ) {
203
+ public CompletableFuture <List <?>> exec (Code code , Object ... args ) {
201
204
validateArgs (args );
202
- FutureImpl <List <?>> q = new FutureImpl <List <?>>(syncId .incrementAndGet ());
205
+ long sid = syncId .incrementAndGet ();
206
+ CompletableFuture <List <?>> q = new CompletableFuture <>();
203
207
if (isDead (q )) {
204
208
return q ;
205
209
}
206
- futures .put (q . getId () , q );
210
+ futures .put (sid , q );
207
211
if (isDead (q )) {
208
- futures .remove (q . getId () );
212
+ futures .remove (sid );
209
213
return q ;
210
214
}
211
215
try {
212
- write (code , q . getId () , null , args );
216
+ write (code , sid , null , args );
213
217
} catch (Exception e ) {
214
- futures .remove (q . getId () );
218
+ futures .remove (sid );
215
219
fail (q , e );
216
220
}
217
221
return q ;
@@ -225,11 +229,11 @@ protected synchronized void die(String message, Exception cause) {
225
229
this .thumbstone = new CommunicationException (message , cause );
226
230
this .alive = new CountDownLatch (1 );
227
231
while (!futures .isEmpty ()) {
228
- Iterator <Map .Entry <Long , FutureImpl <List <?>>>> iterator = futures .entrySet ().iterator ();
232
+ Iterator <Map .Entry <Long , CompletableFuture <List <?>>>> iterator = futures .entrySet ().iterator ();
229
233
while (iterator .hasNext ()) {
230
- Map .Entry <Long , FutureImpl <List <?>>> elem = iterator .next ();
234
+ Map .Entry <Long , CompletableFuture <List <?>>> elem = iterator .next ();
231
235
if (elem != null ) {
232
- FutureImpl <List <?>> future = elem .getValue ();
236
+ CompletableFuture <List <?>> future = elem .getValue ();
233
237
fail (future , cause );
234
238
}
235
239
iterator .remove ();
@@ -333,7 +337,7 @@ protected void readThread() {
333
337
readPacket (is );
334
338
code = (Long ) headers .get (Key .CODE .getId ());
335
339
Long syncId = (Long ) headers .get (Key .SYNC .getId ());
336
- FutureImpl <List <?>> future = futures .remove (syncId );
340
+ CompletableFuture <List <?>> future = futures .remove (syncId );
337
341
stats .received ++;
338
342
wait .decrementAndGet ();
339
343
complete (code , future );
@@ -381,14 +385,14 @@ protected void writeThread() {
381
385
}
382
386
383
387
384
- protected void fail (FutureImpl <List <?>> q , Exception e ) {
385
- q .setError (e );
388
+ protected void fail (CompletableFuture <List <?>> q , Exception e ) {
389
+ q .completeExceptionally (e );
386
390
}
387
391
388
- protected void complete (long code , FutureImpl <List <?>> q ) {
392
+ protected void complete (long code , CompletableFuture <List <?>> q ) {
389
393
if (q != null ) {
390
394
if (code == 0 ) {
391
- q .setValue ((List ) body .get (Key .DATA .getId ()));
395
+ q .complete ((List ) body .get (Key .DATA .getId ()));
392
396
} else {
393
397
Object error = body .get (Key .ERROR .getId ());
394
398
fail (q , serverError (code , error ));
@@ -488,6 +492,11 @@ public TarantoolClientOps<Integer, List<?>, Object, Future<List<?>>> asyncOps()
488
492
return this ;
489
493
}
490
494
495
+ @ Override
496
+ public TarantoolClientOps <Integer , List <?>, Object , CompletionStage <List <?>>> composableAsyncOps () {
497
+ return composableAsyncOps ;
498
+ }
499
+
491
500
@ Override
492
501
public TarantoolClientOps <Integer , List <?>, Object , Long > fireAndForgetOps () {
493
502
return fireAndForgetOps ;
@@ -529,7 +538,19 @@ public void close() {
529
538
}
530
539
}
531
540
532
- protected boolean isDead (FutureImpl <List <?>> q ) {
541
+ protected class ComposableAsyncOps extends AbstractTarantoolOps <Integer , List <?>, Object , CompletionStage <List <?>>> {
542
+ @ Override
543
+ public CompletionStage <List <?>> exec (Code code , Object ... args ) {
544
+ return TarantoolClientImpl .this .exec (code , args );
545
+ }
546
+
547
+ @ Override
548
+ public void close () {
549
+ TarantoolClientImpl .this .close ();
550
+ }
551
+ }
552
+
553
+ protected boolean isDead (CompletableFuture <List <?>> q ) {
533
554
if (TarantoolClientImpl .this .thumbstone != null ) {
534
555
fail (q , new CommunicationException ("Connection is dead" , thumbstone ));
535
556
return true ;
0 commit comments