1818 */
1919package neo4j .org .testkit .backend .channel .handler ;
2020
21+ import static org .neo4j .driver .internal .util .LockUtil .executeWithLock ;
22+
2123import io .netty .channel .Channel ;
2224import io .netty .channel .ChannelHandlerContext ;
2325import io .netty .channel .ChannelInboundHandlerAdapter ;
2426import java .time .zone .ZoneRulesException ;
27+ import java .util .ArrayDeque ;
28+ import java .util .Queue ;
2529import java .util .concurrent .CompletableFuture ;
2630import java .util .concurrent .CompletionException ;
2731import java .util .concurrent .CompletionStage ;
2832import java .util .concurrent .Executor ;
2933import java .util .concurrent .Executors ;
34+ import java .util .concurrent .locks .Lock ;
35+ import java .util .concurrent .locks .ReentrantLock ;
3036import java .util .function .BiFunction ;
37+ import java .util .function .Consumer ;
3138import neo4j .org .testkit .backend .CustomDriverError ;
3239import neo4j .org .testkit .backend .FrontendError ;
3340import neo4j .org .testkit .backend .TestkitState ;
@@ -47,6 +54,9 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4754 private final BiFunction <TestkitRequest , TestkitState , CompletionStage <TestkitResponse >> processorImpl ;
4855 // Some requests require multiple threads
4956 private final Executor requestExecutorService = Executors .newFixedThreadPool (10 );
57+ private final Lock lock = new ReentrantLock ();
58+ private final Queue <TestkitResponse > responseQueue = new ArrayDeque <>();
59+ private boolean canDispatchResponse = false ;
5060 private Channel channel ;
5161
5262 public TestkitRequestProcessorHandler (BackendMode backendMode , Logging logging ) {
@@ -71,15 +81,26 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
7181 public void channelRead (ChannelHandlerContext ctx , Object msg ) {
7282 // Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like
7383 // resolvers support, is blocking.
84+ executeWithLock (lock , () -> {
85+ canDispatchResponse = true ;
86+ trySending (ctx ::writeAndFlush );
87+ });
7488 requestExecutorService .execute (() -> {
7589 try {
7690 var request = (TestkitRequest ) msg ;
7791 var responseStage = processorImpl .apply (request , testkitState );
7892 responseStage .whenComplete ((response , throwable ) -> {
7993 if (throwable != null ) {
80- ctx .writeAndFlush (createErrorResponse (throwable ));
94+ var errorResponse = createErrorResponse (throwable );
95+ executeWithLock (lock , () -> {
96+ responseQueue .offer (errorResponse );
97+ trySending (ctx ::writeAndFlush );
98+ });
8199 } else if (response != null ) {
82- ctx .writeAndFlush (response );
100+ executeWithLock (lock , () -> {
101+ responseQueue .offer (response );
102+ trySending (ctx ::writeAndFlush );
103+ });
83104 }
84105 });
85106 } catch (Throwable throwable ) {
@@ -88,6 +109,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
88109 });
89110 }
90111
112+ private void trySending (Consumer <TestkitResponse > responseWriter ) {
113+ if (canDispatchResponse && !responseQueue .isEmpty ()) {
114+ canDispatchResponse = false ;
115+ responseWriter .accept (responseQueue .poll ());
116+ }
117+ }
118+
91119 private static CompletionStage <TestkitResponse > wrapSyncRequest (
92120 TestkitRequest testkitRequest , TestkitState testkitState ) {
93121 var result = new CompletableFuture <TestkitResponse >();
@@ -101,7 +129,11 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
101129
102130 @ Override
103131 public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
104- ctx .writeAndFlush (createErrorResponse (cause ));
132+ var response = createErrorResponse (cause );
133+ executeWithLock (lock , () -> {
134+ responseQueue .offer (response );
135+ trySending (ctx ::writeAndFlush );
136+ });
105137 }
106138
107139 private TestkitResponse createErrorResponse (Throwable throwable ) {
@@ -165,7 +197,10 @@ private void writeAndFlush(TestkitResponse response) {
165197 if (channel == null ) {
166198 throw new IllegalStateException ("Called before channel is initialized" );
167199 }
168- channel .writeAndFlush (response );
200+ executeWithLock (lock , () -> {
201+ responseQueue .offer (response );
202+ trySending (channel ::writeAndFlush );
203+ });
169204 }
170205
171206 public enum BackendMode {
0 commit comments