Skip to content

Commit 8aa0c4f

Browse files
author
Zhen
committed
Merge branch '1.0' into 1.1
2 parents ac16172 + 68e6344 commit 8aa0c4f

File tree

5 files changed

+172
-30
lines changed

5 files changed

+172
-30
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -150,38 +150,47 @@ public void close()
150150
{
151151
throw new ClientException( "This session has already been closed." );
152152
}
153-
else
153+
154+
if ( !connection.isOpen() )
154155
{
155-
synchronized ( this )
156+
// the socket connection is already closed due to some error, cannot send more data
157+
closeConnection();
158+
return;
159+
}
160+
161+
synchronized ( this )
162+
{
163+
if ( currentTransaction != null )
156164
{
157-
if ( currentTransaction != null )
165+
try
158166
{
159-
try
160-
{
161-
currentTransaction.close();
162-
}
163-
catch ( Throwable e )
164-
{
165-
// Best-effort
166-
logger.error( "Failed to close tx due to error: " + e.toString(), e );
167-
}
167+
currentTransaction.close();
168+
}
169+
catch ( Throwable e )
170+
{
171+
// Best-effort
172+
logger.warn( "WARNING: Failed to close tx due to error: " + e.toString() );
168173
}
169174
}
170-
try
171-
{
172-
connection.sync();
173-
}
174-
catch( Throwable t )
175-
{
176-
logger.error( "Failed to sync messages due to error: " + t.toString(), t );
177-
throw t;
178-
}
179-
finally
180-
{
181-
logger.debug( "~~ connection released by [session-%s]", sessionId );
182-
connection.close();
183-
}
184175
}
176+
try
177+
{
178+
connection.sync();
179+
}
180+
catch ( Throwable t )
181+
{
182+
throw t;
183+
}
184+
finally
185+
{
186+
closeConnection();
187+
}
188+
}
189+
190+
private void closeConnection()
191+
{
192+
logger.debug( "~~ connection released by [session-%s]", sessionId );
193+
connection.close();
185194
}
186195

187196
@Override
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.connector.socket;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.ByteChannel;
24+
25+
import org.neo4j.driver.internal.util.BytePrinter;
26+
import org.neo4j.driver.v1.exceptions.ClientException;
27+
28+
/**
29+
* Utility class for common operations.
30+
*/
31+
public final class SocketUtils
32+
{
33+
private SocketUtils()
34+
{
35+
throw new UnsupportedOperationException( "Do not instantiate" );
36+
}
37+
38+
public static void blockingRead(ByteChannel channel, ByteBuffer buf) throws IOException
39+
{
40+
while(buf.hasRemaining())
41+
{
42+
if (channel.read( buf ) < 0)
43+
{
44+
try
45+
{
46+
channel.close();
47+
}
48+
catch ( IOException e )
49+
{
50+
// best effort
51+
}
52+
String bufStr = BytePrinter.hex( buf ).trim();
53+
throw new ClientException( String.format(
54+
"Connection terminated while receiving data. This can happen due to network " +
55+
"instabilities, or due to restarts of the database. Expected %s bytes, received %s.",
56+
buf.limit(), bufStr.isEmpty() ? "none" : bufStr ) );
57+
}
58+
}
59+
}
60+
61+
public static void blockingWrite(ByteChannel channel, ByteBuffer buf) throws IOException
62+
{
63+
while(buf.hasRemaining())
64+
{
65+
if (channel.write( buf ) < 0)
66+
{
67+
try
68+
{
69+
channel.close();
70+
}
71+
catch ( IOException e )
72+
{
73+
// best effort
74+
}
75+
String bufStr = BytePrinter.hex( buf ).trim();
76+
throw new ClientException( String.format(
77+
"Connection terminated while sending data. This can happen due to network " +
78+
"instabilities, or due to restarts of the database. Expected %s bytes, wrote %s.",
79+
buf.limit(), bufStr.isEmpty() ? "none" :bufStr ) );
80+
}
81+
}
82+
}
83+
}

driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ else if ( buffer.remaining() >= 2 )
399399
* @param buffer The buffer to read into
400400
* @throws IOException
401401
*/
402-
private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
402+
static void readNextPacket( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
403403
{
404404
assert !buffer.hasRemaining();
405405

@@ -409,11 +409,18 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff
409409
int read = channel.read( buffer );
410410
if ( read == -1 )
411411
{
412+
try
413+
{
414+
channel.close();
415+
}
416+
catch ( IOException e )
417+
{
418+
// best effort
419+
}
412420
throw new ConnectionFailureException(
413421
"Connection terminated while receiving data. This can happen due to network " +
414-
"instabilities, or due to restarts of the database.");
422+
"instabilities, or due to restarts of the database." );
415423
}
416-
buffer.flip();
417424
}
418425
catch ( ClosedByInterruptException e )
419426
{
@@ -430,6 +437,10 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff
430437
throw new ConnectionFailureException(
431438
"Unable to process request: " + message + " buffer: \n" + BytePrinter.hex( buffer ), e );
432439
}
440+
finally
441+
{
442+
buffer.flip();
443+
}
433444
}
434445

435446
/**

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ void blockingRead( ByteBuffer buf ) throws IOException
7676
{
7777
if (channel.read( buf ) < 0)
7878
{
79+
try
80+
{
81+
channel.close();
82+
}
83+
catch ( IOException e )
84+
{
85+
// best effort
86+
}
7987
String bufStr = BytePrinter.hex( buf ).trim();
8088
throw new ConnectionFailureException( format(
8189
"Connection terminated while receiving data. This can happen due to network " +
@@ -91,6 +99,14 @@ void blockingWrite( ByteBuffer buf ) throws IOException
9199
{
92100
if (channel.write( buf ) < 0)
93101
{
102+
try
103+
{
104+
channel.close();
105+
}
106+
catch ( IOException e )
107+
{
108+
// best effort
109+
}
94110
String bufStr = BytePrinter.hex( buf ).trim();
95111
throw new ConnectionFailureException( format(
96112
"Connection terminated while sending data. This can happen due to network " +

driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.hamcrest.CoreMatchers.equalTo;
3838
import static org.hamcrest.MatcherAssert.assertThat;
3939
import static org.junit.Assert.assertEquals;
40+
import static org.junit.Assert.assertFalse;
4041
import static org.junit.Assert.fail;
4142
import static org.mockito.Matchers.any;
4243
import static org.mockito.Mockito.mock;
@@ -492,6 +493,28 @@ public void shouldFailNicelyOnClosedConnections() throws IOException
492493
input.readByte();
493494
}
494495

496+
497+
@Test
498+
public void shouldKeepBufferCorrectWhenError() throws Throwable
499+
{
500+
// Given
501+
ReadableByteChannel channel = mock( ReadableByteChannel.class );
502+
when( channel.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
503+
ByteBuffer buffer = ByteBuffer.allocate( 8 );
504+
buffer.limit(0);
505+
506+
//Expect
507+
exception.expect( ConnectionFailureException.class );
508+
exception.expectMessage( "Connection terminated while receiving data. This can happen due to network " +
509+
"instabilities, or due to restarts of the database." );
510+
// When
511+
BufferingChunkedInput.readNextPacket( channel, buffer );
512+
assertEquals( buffer.position(), 0 );
513+
assertEquals( buffer.limit(), 0 );
514+
assertEquals( buffer.capacity(), 8 );
515+
assertFalse( channel.isOpen() );
516+
}
517+
495518
private ReadableByteChannel fillPacket( int size, int value )
496519
{
497520
int[] ints = new int[size];
@@ -542,4 +565,4 @@ public void close() throws IOException
542565
};
543566
}
544567

545-
}
568+
}

0 commit comments

Comments
 (0)