11/**
22 * Copyright (c) 2002-2016 "Neo Technology,"
33 * Network Engine for Objects in Lund AB [http://neotechnology.com]
4- *
4+ * <p>
55 * This file is part of Neo4j.
6- *
6+ * <p>
77 * Licensed under the Apache License, Version 2.0 (the "License");
88 * you may not use this file except in compliance with the License.
99 * You may obtain a copy of the License at
10- *
11- * http://www.apache.org/licenses/LICENSE-2.0
12- *
10+ * <p>
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ * <p>
1313 * Unless required by applicable law or agreed to in writing, software
1414 * distributed under the License is distributed on an "AS IS" BASIS,
1515 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -100,8 +100,11 @@ public State readChunkSize( BufferingChunkedInput ctx ) throws IOException
100100 {
101101 if ( ctx .buffer .remaining () == 0 )
102102 {
103- //buffer empty, read next packet and try again
104- readNextPacket ( ctx .channel , ctx .buffer );
103+ //buffer empty, block until you get at least at least one byte
104+ while ( ctx .buffer .remaining () == 0 )
105+ {
106+ readNextPacket ( ctx .channel , ctx .buffer );
107+ }
105108 return AWAITING_CHUNK .readChunkSize ( ctx );
106109 }
107110 else if ( ctx .buffer .remaining () >= 2 )
@@ -132,7 +135,7 @@ public State read( BufferingChunkedInput ctx ) throws IOException
132135 public State peekByte ( BufferingChunkedInput ctx ) throws IOException
133136 {
134137 //read chunk size and then proceed to read the rest of the chunk.
135- return readChunkSize ( ctx ).peekByte ( ctx );
138+ return readChunkSize ( ctx ).peekByte ( ctx );
136139 }
137140 },
138141 IN_CHUNK
@@ -175,7 +178,7 @@ else if ( ctx.buffer.remaining() < ctx.scratchBuffer.remaining() )
175178 int bytesToRead = min ( ctx .scratchBuffer .remaining (), ctx .remainingChunkSize );
176179 copyBytes ( ctx .buffer , ctx .scratchBuffer , bytesToRead );
177180 ctx .remainingChunkSize -= bytesToRead ;
178- if (ctx .scratchBuffer .remaining () == 0 )
181+ if ( ctx .scratchBuffer .remaining () == 0 )
179182 {
180183 //we have written all data that was asked for us
181184 return IN_CHUNK ;
@@ -229,7 +232,7 @@ public State readChunkSize( BufferingChunkedInput ctx ) throws IOException
229232 }
230233
231234 @ Override
232- public State read ( BufferingChunkedInput ctx ) throws IOException
235+ public State read ( BufferingChunkedInput ctx ) throws IOException
233236 {
234237 throw new IllegalStateException ( "Cannot read data while in progress of reading header" );
235238 }
@@ -271,25 +274,34 @@ public State peekByte( BufferingChunkedInput ctx ) throws IOException
271274 * @param buffer The buffer to read into
272275 * @throws IOException
273276 */
274- private static void readNextPacket (ReadableByteChannel channel , ByteBuffer buffer ) throws IOException
277+ private static void readNextPacket ( ReadableByteChannel channel , ByteBuffer buffer ) throws IOException
275278 {
276279 try
277280 {
278281 buffer .clear ();
279- channel .read ( buffer );
282+ int read = channel .read ( buffer );
283+ if ( read == -1 )
284+ {
285+ throw new ClientException (
286+ "Connection terminated while receiving data. This can happen due to network " +
287+ "instabilities, or due to restarts of the database." );
288+ }
280289 buffer .flip ();
281290 }
282- catch ( ClosedByInterruptException e )
291+ catch ( ClosedByInterruptException e )
283292 {
284293 throw new ClientException (
285- "Connection to the database was lost because someone called `interrupt()` on the driver thread waiting for a reply. " +
286- "This normally happens because the JVM is shutting down, but it can also happen because your application code or some " +
294+ "Connection to the database was lost because someone called `interrupt()` on the driver " +
295+ "thread waiting for a reply. " +
296+ "This normally happens because the JVM is shutting down, but it can also happen because your " +
297+ "application code or some " +
287298 "framework you are using is manually interrupting the thread." );
288299 }
289300 catch ( IOException e )
290301 {
291302 String message = e .getMessage () == null ? e .getClass ().getSimpleName () : e .getMessage ();
292- throw new ClientException ( "Unable to process request: " + message + " buffer: \n " + BytePrinter .hex ( buffer ), e );
303+ throw new ClientException (
304+ "Unable to process request: " + message + " buffer: \n " + BytePrinter .hex ( buffer ), e );
293305 }
294306 }
295307
@@ -368,7 +380,7 @@ public PackInput readBytes( byte[] into, int offset, int toRead ) throws IOExcep
368380 public byte peekByte () throws IOException
369381 {
370382 state = state .peekByte ( this );
371- return buffer .get (buffer .position ());
383+ return buffer .get ( buffer .position () );
372384 }
373385
374386 private boolean hasMoreDataUnreadInCurrentChunk ()
@@ -382,7 +394,7 @@ private boolean hasMoreDataUnreadInCurrentChunk()
382394 public void run ()
383395 {
384396 // the on message complete should only be called when no data unread from the message buffer
385- if ( hasMoreDataUnreadInCurrentChunk () )
397+ if ( hasMoreDataUnreadInCurrentChunk () )
386398 {
387399 throw new ClientException ( "Trying to read message complete ending '00 00' while there are more data " +
388400 "left in the message content unread: buffer [" +
@@ -392,11 +404,12 @@ public void run()
392404 try
393405 {
394406 // read message boundary
395- state .readChunkSize ( BufferingChunkedInput .this );
407+ state .readChunkSize ( BufferingChunkedInput .this );
396408 if ( remainingChunkSize != 0 )
397409 {
398410 throw new ClientException ( "Expecting message complete ending '00 00', but got " +
399- BytePrinter .hex ( ByteBuffer .allocate ( 2 ).putShort ( (short ) remainingChunkSize ) ) );
411+ BytePrinter .hex ( ByteBuffer .allocate ( 2 )
412+ .putShort ( (short ) remainingChunkSize ) ) );
400413 }
401414 }
402415 catch ( IOException e )
0 commit comments