2222import io .netty .channel .ChannelHandlerContext ;
2323import io .netty .channel .ChannelInboundHandlerAdapter ;
2424import java .time .zone .ZoneRulesException ;
25+ import java .util .ArrayDeque ;
26+ import java .util .Queue ;
2527import java .util .concurrent .CompletableFuture ;
2628import java .util .concurrent .CompletionException ;
2729import java .util .concurrent .CompletionStage ;
2830import java .util .concurrent .Executor ;
2931import java .util .concurrent .Executors ;
3032import java .util .function .BiFunction ;
33+ import java .util .function .Consumer ;
3134import neo4j .org .testkit .backend .CustomDriverError ;
3235import neo4j .org .testkit .backend .FrontendError ;
3336import neo4j .org .testkit .backend .TestkitState ;
@@ -48,6 +51,7 @@ public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
4851 // Some requests require multiple threads
4952 private final Executor requestExecutorService = Executors .newFixedThreadPool (10 );
5053 private Channel channel ;
54+ private ResponseQueueHanlder responseQueueHanlder ;
5155
5256 public TestkitRequestProcessorHandler (BackendMode backendMode , Logging logging ) {
5357 switch (backendMode ) {
@@ -64,22 +68,22 @@ public TestkitRequestProcessorHandler(BackendMode backendMode, Logging logging)
6468 @ Override
6569 public void channelRegistered (ChannelHandlerContext ctx ) throws Exception {
6670 channel = ctx .channel ();
71+ responseQueueHanlder = new ResponseQueueHanlder (channel ::writeAndFlush );
6772 super .channelRegistered (ctx );
6873 }
6974
7075 @ Override
7176 public void channelRead (ChannelHandlerContext ctx , Object msg ) {
7277 // Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like
7378 // resolvers support, is blocking.
79+ responseQueueHanlder .setResponseReadyAndDispatchFirst ();
7480 requestExecutorService .execute (() -> {
7581 try {
7682 var request = (TestkitRequest ) msg ;
77- var responseStage = processorImpl .apply (request , testkitState );
78- responseStage .whenComplete ((response , throwable ) -> {
79- if (throwable != null ) {
80- ctx .writeAndFlush (createErrorResponse (throwable ));
81- } else if (response != null ) {
82- ctx .writeAndFlush (response );
83+ processorImpl .apply (request , testkitState ).whenComplete ((response , throwable ) -> {
84+ var testkitResponse = throwable != null ? createErrorResponse (throwable ) : response ;
85+ if (testkitResponse != null ) {
86+ responseQueueHanlder .offerAndDispatchFirst (testkitResponse );
8387 }
8488 });
8589 } catch (Throwable throwable ) {
@@ -101,7 +105,8 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
101105
102106 @ Override
103107 public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
104- ctx .writeAndFlush (createErrorResponse (cause ));
108+ var response = createErrorResponse (cause );
109+ responseQueueHanlder .offerAndDispatchFirst (response );
105110 }
106111
107112 private TestkitResponse createErrorResponse (Throwable throwable ) {
@@ -165,7 +170,7 @@ private void writeAndFlush(TestkitResponse response) {
165170 if (channel == null ) {
166171 throw new IllegalStateException ("Called before channel is initialized" );
167172 }
168- channel . writeAndFlush (response );
173+ responseQueueHanlder . offerAndDispatchFirst (response );
169174 }
170175
171176 public enum BackendMode {
@@ -174,4 +179,34 @@ public enum BackendMode {
174179 REACTIVE_LEGACY ,
175180 REACTIVE
176181 }
182+
183+ private static class ResponseQueueHanlder {
184+ private final Consumer <TestkitResponse > responseWriter ;
185+ private final Queue <TestkitResponse > responseQueue = new ArrayDeque <>();
186+ private boolean responseReady ;
187+
188+ ResponseQueueHanlder (Consumer <TestkitResponse > responseWriter ) {
189+ this .responseWriter = responseWriter ;
190+ }
191+
192+ synchronized void setResponseReadyAndDispatchFirst () {
193+ responseReady = true ;
194+ dispatchFirst ();
195+ }
196+
197+ synchronized void offerAndDispatchFirst (TestkitResponse response ) {
198+ responseQueue .offer (response );
199+ if (responseReady ) {
200+ dispatchFirst ();
201+ }
202+ }
203+
204+ private synchronized void dispatchFirst () {
205+ var response = responseQueue .poll ();
206+ if (response != null ) {
207+ responseReady = false ;
208+ responseWriter .accept (responseQueue .poll ());
209+ }
210+ }
211+ }
177212}
0 commit comments