1919import static java .util .Objects .requireNonNull ;
2020
2121import io .netty .bootstrap .Bootstrap ;
22+ import io .netty .channel .Channel ;
2223import io .netty .channel .ChannelFuture ;
23- import io .netty .channel .ChannelFutureListener ;
2424import io .netty .channel .ChannelOption ;
25- import io .netty .channel .ChannelPromise ;
2625import io .netty .channel .EventLoopGroup ;
2726import io .netty .channel .local .LocalAddress ;
2827import io .netty .channel .local .LocalChannel ;
4746import org .neo4j .driver .internal .bolt .basicimpl .async .NetworkConnection ;
4847import org .neo4j .driver .internal .bolt .basicimpl .async .connection .ChannelConnectedListener ;
4948import org .neo4j .driver .internal .bolt .basicimpl .async .connection .ChannelPipelineBuilderImpl ;
50- import org .neo4j .driver .internal .bolt .basicimpl .async .connection .HandshakeCompletedListener ;
5149import org .neo4j .driver .internal .bolt .basicimpl .async .connection .NettyChannelInitializer ;
5250import org .neo4j .driver .internal .bolt .basicimpl .async .connection .NettyDomainNameResolverGroup ;
5351import org .neo4j .driver .internal .bolt .basicimpl .async .inbound .ConnectTimeoutHandler ;
52+ import org .neo4j .driver .internal .bolt .basicimpl .messaging .BoltProtocol ;
5453import org .neo4j .driver .internal .bolt .basicimpl .spi .Connection ;
5554
5655public final class NettyConnectionProvider implements ConnectionProvider {
@@ -69,7 +68,7 @@ public NettyConnectionProvider(
6968 LocalAddress localAddress ,
7069 LoggingProvider logging ) {
7170 this .eventLoopGroup = eventLoopGroup ;
72- this .clock = clock ;
71+ this .clock = requireNonNull ( clock ) ;
7372 this .domainNameResolver = requireNonNull (domainNameResolver );
7473 this .addressResolverGroup = new NettyDomainNameResolverGroup (this .domainNameResolver );
7574 this .localAddress = localAddress ;
@@ -111,42 +110,22 @@ public CompletionStage<Connection> acquireConnection(
111110 socketAddress = localAddress ;
112111 }
113112
114- var connectedFuture = bootstrap .connect (socketAddress );
115-
116- var channel = connectedFuture .channel ();
117- var handshakeCompleted = channel .newPromise ();
118- var connectionInitialized = channel .newPromise ();
119-
120- installChannelConnectedListeners (address , connectedFuture , handshakeCompleted , connectTimeoutMillis );
121- installHandshakeCompletedListeners (
122- handshakeCompleted ,
123- connectionInitialized ,
124- address ,
125- routingContext ,
126- authMap ,
127- boltAgent ,
128- userAgent ,
129- latestAuthMillisFuture ,
130- notificationConfig );
131-
132- var future = new CompletableFuture <Connection >();
133- connectionInitialized .addListener ((ChannelFutureListener ) f -> {
134- var throwable = f .cause ();
135- if (throwable != null ) {
136- future .completeExceptionally (throwable );
137- } else {
138- var connection = new NetworkConnection (channel , logging );
139- future .complete (connection );
140- }
141- });
142- return future ;
113+ return installChannelConnectedListeners (address , bootstrap .connect (socketAddress ), connectTimeoutMillis )
114+ .thenCompose (channel -> BoltProtocol .forChannel (channel )
115+ .initializeChannel (
116+ channel ,
117+ requireNonNull (userAgent ),
118+ requireNonNull (boltAgent ),
119+ authMap ,
120+ routingContext ,
121+ notificationConfig ,
122+ clock ,
123+ latestAuthMillisFuture ))
124+ .thenApply (channel -> new NetworkConnection (channel , logging ));
143125 }
144126
145- private void installChannelConnectedListeners (
146- BoltServerAddress address ,
147- ChannelFuture channelConnected ,
148- ChannelPromise handshakeCompleted ,
149- int connectTimeoutMillis ) {
127+ private CompletionStage <Channel > installChannelConnectedListeners (
128+ BoltServerAddress address , ChannelFuture channelConnected , int connectTimeoutMillis ) {
150129 var pipeline = channelConnected .channel ().pipeline ();
151130
152131 // add timeout handler to the pipeline when channel is connected. it's needed to
@@ -156,42 +135,16 @@ private void installChannelConnectedListeners(
156135 channelConnected .addListener (future -> pipeline .addFirst (new ConnectTimeoutHandler (connectTimeoutMillis )));
157136
158137 // add listener that sends Bolt handshake bytes when channel is connected
138+ var handshakeCompleted = new CompletableFuture <Channel >();
159139 channelConnected .addListener (
160140 new ChannelConnectedListener (address , new ChannelPipelineBuilderImpl (), handshakeCompleted , logging ));
161- }
162-
163- private void installHandshakeCompletedListeners (
164- ChannelPromise handshakeCompleted ,
165- ChannelPromise connectionInitialized ,
166- BoltServerAddress address ,
167- RoutingContext routingContext ,
168- Map <String , Value > authMap ,
169- BoltAgent boltAgent ,
170- String userAgent ,
171- CompletableFuture <Long > latestAuthMillisFuture ,
172- NotificationConfig notificationConfig ) {
173- var pipeline = handshakeCompleted .channel ().pipeline ();
174-
175- // remove timeout handler from the pipeline once TLS and Bolt handshakes are
176- // completed. regular protocol
177- // messages will flow next and we do not want to have read timeout for them
178- handshakeCompleted .addListener (future -> {
179- if (future .isSuccess ()) {
180- pipeline .remove (ConnectTimeoutHandler .class );
141+ return handshakeCompleted .whenComplete ((channel , throwable ) -> {
142+ if (throwable == null ) {
143+ // remove timeout handler from the pipeline once TLS and Bolt handshakes are
144+ // completed. regular protocol
145+ // messages will flow next and we do not want to have read timeout for them
146+ channel .pipeline ().remove (ConnectTimeoutHandler .class );
181147 }
182148 });
183-
184- // add listener that sends an INIT message. connection is now fully established.
185- // channel pipeline is fully
186- // set to send/receive messages for a selected protocol version
187- handshakeCompleted .addListener (new HandshakeCompletedListener (
188- authMap ,
189- userAgent ,
190- boltAgent ,
191- routingContext ,
192- connectionInitialized ,
193- notificationConfig ,
194- this .clock ,
195- latestAuthMillisFuture ));
196149 }
197150}
0 commit comments