Skip to content

Commit d60673c

Browse files
committed
HDDS-1257. Incorrect object because of mismatch in block lengths. Contributed by Shashikant Banerjee.
1 parent 983b78a commit d60673c

File tree

1 file changed

+25
-13
lines changed

1 file changed

+25
-13
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.List;
4747
import java.util.ArrayList;
4848
import java.util.concurrent.*;
49+
import java.util.stream.Collectors;
4950

5051
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
5152
.putBlockAsync;
@@ -108,7 +109,10 @@ public class BlockOutputStream extends OutputStream {
108109
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
109110
futureMap;
110111
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
111-
private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
112+
113+
// The map should maintain the keys (logIndexes) in order so that while
114+
// removing we always end up updating incremented data flushed length.
115+
private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
112116

113117
private List<DatanodeDetails> failedServers;
114118

@@ -157,7 +161,7 @@ public BlockOutputStream(BlockID blockID, String key,
157161

158162
// A single thread executor handle the responses of async requests
159163
responseExecutor = Executors.newSingleThreadExecutor();
160-
commitIndex2flushedDataMap = new ConcurrentHashMap<>();
164+
commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
161165
totalAckDataLength = 0;
162166
futureMap = new ConcurrentHashMap<>();
163167
totalDataFlushedLength = 0;
@@ -206,7 +210,7 @@ public void write(byte[] b, int off, int len) throws IOException {
206210
int writeLen;
207211

208212
// Allocate a buffer if needed. The buffer will be allocated only
209-
// once as needed and will be reused again for mutiple blockOutputStream
213+
// once as needed and will be reused again for multiple blockOutputStream
210214
// entries.
211215
ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded();
212216
int pos = currentBuffer.position();
@@ -281,10 +285,18 @@ public void writeOnRetry(long len) throws IOException {
281285
* just update the totalAckDataLength. In case of failure,
282286
* we will read the data starting from totalAckDataLength.
283287
*/
284-
private void updateFlushIndex(long index) {
285-
if (!commitIndex2flushedDataMap.isEmpty()) {
288+
private void updateFlushIndex(List<Long> indexes) {
289+
Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
290+
for (long index : indexes) {
286291
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
287-
totalAckDataLength = commitIndex2flushedDataMap.remove(index);
292+
long length = commitIndex2flushedDataMap.remove(index);
293+
294+
// totalAckDataLength replicated yet should always be less than equal to
295+
// the current length being returned from commitIndex2flushedDataMap.
296+
// The below precondition would ensure commitIndex2flushedDataMap entries
297+
// are removed in order of the insertion to the map.
298+
Preconditions.checkArgument(totalAckDataLength < length);
299+
totalAckDataLength = length;
288300
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
289301
futureMap.remove(totalAckDataLength);
290302
// Flush has been committed to required servers successful.
@@ -325,13 +337,13 @@ private void handleFullBuffer() throws IOException {
325337
}
326338

327339
private void adjustBuffers(long commitIndex) {
328-
commitIndex2flushedDataMap.keySet().stream().forEach(index -> {
329-
if (index <= commitIndex) {
330-
updateFlushIndex(index);
331-
} else {
332-
return;
333-
}
334-
});
340+
List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
341+
.filter(p -> p <= commitIndex).collect(Collectors.toList());
342+
if (keyList.isEmpty()) {
343+
return;
344+
} else {
345+
updateFlushIndex(keyList);
346+
}
335347
}
336348

337349
// It may happen that once the exception is encountered , we still might

0 commit comments

Comments
 (0)