Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ abstract public class FSOutputSummer extends OutputStream implements
// data checksum
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
private byte[] buf;
// internal buffer for storing checksum
private byte checksum[];
private byte[] checksum;
// The number of valid bytes in the buffer.
private int count;

Expand Down Expand Up @@ -100,7 +100,7 @@ public synchronized void write(int b) throws IOException {
* @exception IOException if an I/O error occurs.
*/
@Override
public synchronized void write(byte b[], int off, int len)
public synchronized void write(byte[] b, int off, int len)
throws IOException {

checkClosed();
Expand All @@ -117,7 +117,7 @@ public synchronized void write(byte b[], int off, int len)
* Write a portion of an array, flushing to the underlying
* stream at most once if necessary.
*/
private int write1(byte b[], int off, int len) throws IOException {
private int write1(byte[] b, int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
Expand All @@ -129,7 +129,7 @@ private int write1(byte b[], int off, int len) throws IOException {

// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
bytesToCopy = Math.min(len, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
Expand Down Expand Up @@ -207,21 +207,16 @@ protected TraceScope createWriteTraceScope() {
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunks(byte b[], int off, int len)
private void writeChecksumChunks(byte[] b, int off, int len)
throws IOException {
sum.calculateChunkedSums(b, off, len, checksum, 0);
TraceScope scope = createWriteTraceScope();
try {
try (TraceScope scope = createWriteTraceScope()) {
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
getChecksumSize());
}
} finally {
if (scope != null) {
scope.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,8 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,

// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
while (true) {
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
Expand All @@ -301,7 +299,6 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -97,7 +96,7 @@ static class MultipleBlockingQueue<T> {
MultipleBlockingQueue(int numQueue, int queueSize) {
queues = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) {
queues.add(new LinkedBlockingQueue<T>(queueSize));
queues.add(new LinkedBlockingQueue<>(queueSize));
}
}

Expand Down Expand Up @@ -480,7 +479,7 @@ private DatanodeInfo[] getExcludedNodes() {
}
}
}
return excluded.toArray(new DatanodeInfo[excluded.size()]);
return excluded.toArray(DatanodeInfo.EMPTY_ARRAY);
}

private void allocateNewBlock() throws IOException {
Expand Down Expand Up @@ -1316,12 +1315,9 @@ void flushAllInternals() throws IOException {
// flush all data to Datanode
final long toWaitFor = flushInternalWithoutWaitingAck();
future = flushAllExecutorCompletionService.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
s.waitForAckedSeqno(toWaitFor);
return null;
}
() -> {
s.waitForAckedSeqno(toWaitFor);
return null;
});
flushAllFuturesMap.put(future, i);
} catch (Exception e) {
Expand Down