@@ -295,60 +295,66 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
295295 throws IOException {
296296 int succeededAllocates = 0 ;
297297 while (len > 0 ) {
298- if (streamEntries .size () <= currentStreamIndex ) {
299- Preconditions .checkNotNull (omClient );
300- // allocate a new block, if a exception happens, log an error and
301- // throw exception to the caller directly, and the write fails.
298+ try {
299+ if (streamEntries .size () <= currentStreamIndex ) {
300+ Preconditions .checkNotNull (omClient );
301+ // allocate a new block, if a exception happens, log an error and
302+ // throw exception to the caller directly, and the write fails.
303+ try {
304+ allocateNewBlock (currentStreamIndex );
305+ succeededAllocates += 1 ;
306+ } catch (IOException ioe ) {
307+ LOG .error ("Try to allocate more blocks for write failed, already "
308+ + "allocated " + succeededAllocates
309+ + " blocks for this write." );
310+ throw ioe ;
311+ }
312+ }
313+ // in theory, this condition should never violate due the check above
314+ // still do a sanity check.
315+ Preconditions .checkArgument (currentStreamIndex < streamEntries .size ());
316+ BlockOutputStreamEntry current = streamEntries .get (currentStreamIndex );
317+
318+ // length(len) will be in int range if the call is happening through
319+ // write API of blockOutputStream. Length can be in long range if it comes
320+ // via Exception path.
321+ int writeLen = Math .min ((int ) len , (int ) current .getRemaining ());
322+ long currentPos = current .getWrittenDataLength ();
302323 try {
303- allocateNewBlock (currentStreamIndex );
304- succeededAllocates += 1 ;
324+ if (retry ) {
325+ current .writeOnRetry (len );
326+ } else {
327+ current .write (b , off , writeLen );
328+ offset += writeLen ;
329+ }
305330 } catch (IOException ioe ) {
306- LOG .error ("Try to allocate more blocks for write failed, already "
307- + "allocated " + succeededAllocates + " blocks for this write." );
308- throw ioe ;
309- }
310- }
311- // in theory, this condition should never violate due the check above
312- // still do a sanity check.
313- Preconditions .checkArgument (currentStreamIndex < streamEntries .size ());
314- BlockOutputStreamEntry current = streamEntries .get (currentStreamIndex );
315-
316- // length(len) will be in int range if the call is happening through
317- // write API of blockOutputStream. Length can be in long range if it comes
318- // via Exception path.
319- int writeLen = Math .min ((int )len , (int ) current .getRemaining ());
320- long currentPos = current .getWrittenDataLength ();
321- try {
322- if (retry ) {
323- current .writeOnRetry (len );
324- } else {
325- current .write (b , off , writeLen );
326- offset += writeLen ;
331+ // for the current iteration, totalDataWritten - currentPos gives the
332+ // amount of data already written to the buffer
333+
334+ // In the retryPath, the total data to be written will always be equal
335+ // to or less than the max length of the buffer allocated.
336+ // The len specified here is the combined sum of the data length of
337+ // the buffers
338+ Preconditions .checkState (!retry || len <= streamBufferMaxSize );
339+ int dataWritten = (int ) (current .getWrittenDataLength () - currentPos );
340+ writeLen = retry ? (int ) len : dataWritten ;
341+ // In retry path, the data written is already accounted in offset.
342+ if (!retry ) {
343+ offset += writeLen ;
344+ }
345+ LOG .debug ("writeLen {}, total len {}" , writeLen , len );
346+ handleException (current , currentStreamIndex , ioe );
327347 }
328- } catch (IOException ioe ) {
329- // for the current iteration, totalDataWritten - currentPos gives the
330- // amount of data already written to the buffer
331-
332- // In the retryPath, the total data to be written will always be equal
333- // to or less than the max length of the buffer allocated.
334- // The len specified here is the combined sum of the data length of
335- // the buffers
336- Preconditions .checkState (!retry || len <= streamBufferMaxSize );
337- int dataWritten = (int ) (current .getWrittenDataLength () - currentPos );
338- writeLen = retry ? (int ) len : dataWritten ;
339- // In retry path, the data written is already accounted in offset.
340- if (!retry ) {
341- offset += writeLen ;
348+ if (current .getRemaining () <= 0 ) {
349+ // since the current block is already written close the stream.
350+ handleFlushOrClose (StreamAction .FULL );
342351 }
343- LOG .debug ("writeLen {}, total len {}" , writeLen , len );
344- handleException (current , currentStreamIndex , ioe );
345- }
346- if (current .getRemaining () <= 0 ) {
347- // since the current block is already written close the stream.
348- handleFlushOrClose (StreamAction .FULL );
352+ len -= writeLen ;
353+ off += writeLen ;
354+ } catch (Exception e ) {
355+ markStreamClosed ();
356+ throw e ;
349357 }
350- len -= writeLen ;
351- off += writeLen ;
352358 }
353359 }
354360
@@ -365,7 +371,7 @@ private void discardPreallocatedBlocks(long containerID,
365371 // pre allocated blocks available.
366372
367373 // This will be called only to discard the next subsequent unused blocks
368- // in the sreamEntryList .
374+ // in the streamEntryList .
369375 if (streamIndex < streamEntries .size ()) {
370376 ListIterator <BlockOutputStreamEntry > streamEntryIterator =
371377 streamEntries .listIterator (streamIndex );
@@ -398,6 +404,20 @@ private void removeEmptyBlocks() {
398404 }
399405 }
400406 }
407+
408+ private void cleanup () {
409+ if (excludeList != null ) {
410+ excludeList .clear ();
411+ excludeList = null ;
412+ }
413+ if (bufferPool != null ) {
414+ bufferPool .clearBufferPool ();
415+ }
416+
417+ if (streamEntries != null ) {
418+ streamEntries .clear ();
419+ }
420+ }
401421 /**
402422 * It performs following actions :
403423 * a. Updates the committed length at datanode for the current stream in
@@ -418,8 +438,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
418438 closedContainerException = checkIfContainerIsClosed (t );
419439 }
420440 PipelineID pipelineId = null ;
421- long totalSuccessfulFlushedData =
422- streamEntry .getTotalAckDataLength ();
441+ long totalSuccessfulFlushedData = streamEntry .getTotalAckDataLength ();
423442 //set the correct length for the current stream
424443 streamEntry .setCurrentPosition (totalSuccessfulFlushedData );
425444 long bufferedDataLen = computeBufferData ();
@@ -450,8 +469,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
450469 if (closedContainerException ) {
451470 // discard subsequent pre allocated blocks from the streamEntries list
452471 // from the closed container
453- discardPreallocatedBlocks (streamEntry .getBlockID ().getContainerID (),
454- null , streamIndex + 1 );
472+ discardPreallocatedBlocks (streamEntry .getBlockID ().getContainerID (), null ,
473+ streamIndex + 1 );
455474 } else {
456475 // In case there is timeoutException or Watch for commit happening over
457476 // majority or the client connection failure to the leader in the
@@ -475,6 +494,11 @@ private void handleException(BlockOutputStreamEntry streamEntry,
475494 }
476495 }
477496
497+ private void markStreamClosed () {
498+ cleanup ();
499+ closed = true ;
500+ }
501+
478502 private void handleRetry (IOException exception , long len ) throws IOException {
479503 RetryPolicy .RetryAction action ;
480504 try {
@@ -586,40 +610,46 @@ private void handleFlushOrClose(StreamAction op) throws IOException {
586610 return ;
587611 }
588612 while (true ) {
589- int size = streamEntries .size ();
590- int streamIndex =
591- currentStreamIndex >= size ? size - 1 : currentStreamIndex ;
592- BlockOutputStreamEntry entry = streamEntries .get (streamIndex );
593- if (entry != null ) {
594- try {
595- Collection <DatanodeDetails > failedServers = entry .getFailedServers ();
596- // failed servers can be null in case there is no data written in the
597- // stream
598- if (failedServers != null && !failedServers .isEmpty ()) {
599- excludeList .addDatanodes (failedServers );
600- }
601- switch (op ) {
602- case CLOSE :
603- entry .close ();
604- break ;
605- case FULL :
606- if (entry .getRemaining () == 0 ) {
613+ try {
614+ int size = streamEntries .size ();
615+ int streamIndex =
616+ currentStreamIndex >= size ? size - 1 : currentStreamIndex ;
617+ BlockOutputStreamEntry entry = streamEntries .get (streamIndex );
618+ if (entry != null ) {
619+ try {
620+ Collection <DatanodeDetails > failedServers =
621+ entry .getFailedServers ();
622+ // failed servers can be null in case there is no data written in the
623+ // stream
624+ if (failedServers != null && !failedServers .isEmpty ()) {
625+ excludeList .addDatanodes (failedServers );
626+ }
627+ switch (op ) {
628+ case CLOSE :
607629 entry .close ();
608- currentStreamIndex ++;
630+ break ;
631+ case FULL :
632+ if (entry .getRemaining () == 0 ) {
633+ entry .close ();
634+ currentStreamIndex ++;
635+ }
636+ break ;
637+ case FLUSH :
638+ entry .flush ();
639+ break ;
640+ default :
641+ throw new IOException ("Invalid Operation" );
609642 }
610- break ;
611- case FLUSH :
612- entry .flush ();
613- break ;
614- default :
615- throw new IOException ("Invalid Operation" );
643+ } catch (IOException ioe ) {
644+ handleException (entry , streamIndex , ioe );
645+ continue ;
616646 }
617- } catch (IOException ioe ) {
618- handleException (entry , streamIndex , ioe );
619- continue ;
620647 }
648+ break ;
649+ } catch (Exception e ) {
650+ markStreamClosed ();
651+ throw e ;
621652 }
622- break ;
623653 }
624654 }
625655
@@ -658,7 +688,7 @@ public void close() throws IOException {
658688 } catch (IOException ioe ) {
659689 throw ioe ;
660690 } finally {
661- bufferPool . clearBufferPool ();
691+ cleanup ();
662692 }
663693 }
664694
0 commit comments