Skip to content

Commit dfed7ea

Browse files
harshith-212shubhluck
authored andcommitted
Revert "HDFS-3246: pRead equivalent for direct read path (apache#597)"
This reverts commit 57202cc.
1 parent 06e7ca7 commit dfed7ea

File tree

8 files changed

+19
-267
lines changed

8 files changed

+19
-267
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -566,52 +566,7 @@ private void decrypt(ByteBuffer buf, int n, int start)
566566
}
567567
buf.position(pos);
568568
}
569-
570-
private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
571-
throws IOException {
572-
ByteBuffer localInBuffer = null;
573-
ByteBuffer localOutBuffer = null;
574-
575-
// Duplicate the buffer so we don't have to worry about resetting the
576-
// original position and limit at the end of the method
577-
buf = buf.duplicate();
578-
579-
int decryptedBytes = 0;
580-
Decryptor localDecryptor = null;
581-
try {
582-
localInBuffer = getBuffer();
583-
localOutBuffer = getBuffer();
584-
localDecryptor = getDecryptor();
585-
byte[] localIV = initIV.clone();
586-
updateDecryptor(localDecryptor, filePosition, localIV);
587-
byte localPadding = getPadding(filePosition);
588-
// Set proper filePosition for inputdata.
589-
localInBuffer.position(localPadding);
590-
591-
while (decryptedBytes < length) {
592-
buf.position(start + decryptedBytes);
593-
buf.limit(start + decryptedBytes +
594-
Math.min(length - decryptedBytes, localInBuffer.remaining()));
595-
localInBuffer.put(buf);
596-
// Do decryption
597-
try {
598-
decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
599-
buf.position(start + decryptedBytes);
600-
buf.limit(start + length);
601-
decryptedBytes += localOutBuffer.remaining();
602-
buf.put(localOutBuffer);
603-
} finally {
604-
localPadding = afterDecryption(localDecryptor, localInBuffer,
605-
filePosition + length, localIV);
606-
}
607-
}
608-
} finally {
609-
returnBuffer(localInBuffer);
610-
returnBuffer(localOutBuffer);
611-
returnDecryptor(localDecryptor);
612-
}
613-
}
614-
569+
615570
@Override
616571
public int available() throws IOException {
617572
checkStream();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.hadoop.fs;
2020

2121
import java.io.DataInputStream;
22-
import java.io.EOFException;
2322
import java.io.FileDescriptor;
2423
import java.io.FileInputStream;
2524
import java.io.IOException;
@@ -267,4 +266,4 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
267266
"unsupported by " + in.getClass().getCanonicalName());
268267
}
269268
}
270-
}
269+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,6 @@ public interface StreamCapabilities {
5959
*/
6060
String UNBUFFER = "in:unbuffer";
6161

62-
/**
63-
* Stream read(ByteBuffer) capability implemented by
64-
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
65-
*/
66-
String READBYTEBUFFER = "in:readbytebuffer";
67-
68-
/**
69-
* Stream read(long, ByteBuffer) capability implemented by
70-
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71-
*/
72-
String PREADBYTEBUFFER = "in:preadbytebuffer";
73-
7462
/**
7563
* Capabilities that a stream can support and be queried for.
7664
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 10 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.EnumSet;
2727
import java.util.Random;
2828

29-
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
3029
import org.apache.hadoop.fs.ByteBufferReadable;
3130
import org.apache.hadoop.fs.CanUnbuffer;
3231
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -130,32 +129,6 @@ private void preadCheck(PositionedReadable in) throws Exception {
130129
Assert.assertArrayEquals(result, expectedData);
131130
}
132131

133-
private int byteBufferPreadAll(ByteBufferPositionedReadable in,
134-
ByteBuffer buf) throws IOException {
135-
int n = 0;
136-
int total = 0;
137-
while (n != -1) {
138-
total += n;
139-
if (!buf.hasRemaining()) {
140-
break;
141-
}
142-
n = in.read(total, buf);
143-
}
144-
145-
return total;
146-
}
147-
148-
private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
149-
throws Exception {
150-
ByteBuffer result = ByteBuffer.allocate(dataLen);
151-
int n = byteBufferPreadAll(in, result);
152-
153-
Assert.assertEquals(dataLen, n);
154-
ByteBuffer expectedData = ByteBuffer.allocate(n);
155-
expectedData.put(data, 0, n);
156-
Assert.assertArrayEquals(result.array(), expectedData.array());
157-
}
158-
159132
protected OutputStream getOutputStream(int bufferSize) throws IOException {
160133
return getOutputStream(bufferSize, key, iv);
161134
}
@@ -315,36 +288,20 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
315288

316289
return total;
317290
}
318-
319-
private int readAll(InputStream in, long pos, ByteBuffer buf)
320-
throws IOException {
321-
int n = 0;
322-
int total = 0;
323-
while (n != -1) {
324-
total += n;
325-
if (!buf.hasRemaining()) {
326-
break;
327-
}
328-
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
329-
}
330-
331-
return total;
332-
}
333291

334292
/** Test positioned read. */
335293
@Test(timeout=120000)
336294
public void testPositionedRead() throws Exception {
337-
try (OutputStream out = getOutputStream(defaultBufferSize)) {
338-
writeData(out);
339-
}
295+
OutputStream out = getOutputStream(defaultBufferSize);
296+
writeData(out);
340297

341-
try (InputStream in = getInputStream(defaultBufferSize)) {
342-
// Pos: 1/3 dataLen
343-
positionedReadCheck(in, dataLen / 3);
298+
InputStream in = getInputStream(defaultBufferSize);
299+
// Pos: 1/3 dataLen
300+
positionedReadCheck(in , dataLen / 3);
344301

345-
// Pos: 1/2 dataLen
346-
positionedReadCheck(in, dataLen / 2);
347-
}
302+
// Pos: 1/2 dataLen
303+
positionedReadCheck(in, dataLen / 2);
304+
in.close();
348305
}
349306

350307
private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -358,35 +315,6 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
358315
System.arraycopy(data, pos, expectedData, 0, n);
359316
Assert.assertArrayEquals(readData, expectedData);
360317
}
361-
362-
/** Test positioned read with ByteBuffers. */
363-
@Test(timeout=120000)
364-
public void testPositionedReadWithByteBuffer() throws Exception {
365-
try (OutputStream out = getOutputStream(defaultBufferSize)) {
366-
writeData(out);
367-
}
368-
369-
try (InputStream in = getInputStream(defaultBufferSize)) {
370-
// Pos: 1/3 dataLen
371-
positionedReadCheckWithByteBuffer(in, dataLen / 3);
372-
373-
// Pos: 1/2 dataLen
374-
positionedReadCheckWithByteBuffer(in, dataLen / 2);
375-
}
376-
}
377-
378-
private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
379-
throws Exception {
380-
ByteBuffer result = ByteBuffer.allocate(dataLen);
381-
int n = readAll(in, pos, result);
382-
383-
Assert.assertEquals(dataLen, n + pos);
384-
byte[] readData = new byte[n];
385-
System.arraycopy(result.array(), 0, readData, 0, n);
386-
byte[] expectedData = new byte[n];
387-
System.arraycopy(data, pos, expectedData, 0, n);
388-
Assert.assertArrayEquals(readData, expectedData);
389-
}
390318

391319
/** Test read fully. */
392320
@Test(timeout=120000)
@@ -630,40 +558,12 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
630558
System.arraycopy(data, 0, expectedData, 0, n);
631559
Assert.assertArrayEquals(readData, expectedData);
632560
}
633-
634-
private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
635-
int bufPos) throws Exception {
636-
// Test reading from position 0
637-
buf.position(bufPos);
638-
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
639-
Assert.assertEquals(bufPos + n, buf.position());
640-
byte[] readData = new byte[n];
641-
buf.rewind();
642-
buf.position(bufPos);
643-
buf.get(readData);
644-
byte[] expectedData = new byte[n];
645-
System.arraycopy(data, 0, expectedData, 0, n);
646-
Assert.assertArrayEquals(readData, expectedData);
647-
648-
// Test reading from half way through the data
649-
buf.position(bufPos);
650-
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
651-
Assert.assertEquals(bufPos + n, buf.position());
652-
readData = new byte[n];
653-
buf.rewind();
654-
buf.position(bufPos);
655-
buf.get(readData);
656-
expectedData = new byte[n];
657-
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
658-
Assert.assertArrayEquals(readData, expectedData);
659-
}
660561

661562
/** Test byte buffer read with different buffer size. */
662563
@Test(timeout=120000)
663564
public void testByteBufferRead() throws Exception {
664-
try (OutputStream out = getOutputStream(defaultBufferSize)) {
665-
writeData(out);
666-
}
565+
OutputStream out = getOutputStream(defaultBufferSize);
566+
writeData(out);
667567

668568
// Default buffer size, initial buffer position is 0
669569
InputStream in = getInputStream(defaultBufferSize);
@@ -713,53 +613,6 @@ public void testByteBufferRead() throws Exception {
713613
byteBufferReadCheck(in, buf, 11);
714614
in.close();
715615
}
716-
717-
/** Test byte buffer pread with different buffer size. */
718-
@Test(timeout=120000)
719-
public void testByteBufferPread() throws Exception {
720-
try (OutputStream out = getOutputStream(defaultBufferSize)) {
721-
writeData(out);
722-
}
723-
724-
try (InputStream defaultBuf = getInputStream(defaultBufferSize);
725-
InputStream smallBuf = getInputStream(smallBufferSize)) {
726-
727-
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
728-
729-
// Default buffer size, initial buffer position is 0
730-
byteBufferPreadCheck(defaultBuf, buf, 0);
731-
732-
// Default buffer size, initial buffer position is not 0
733-
buf.clear();
734-
byteBufferPreadCheck(defaultBuf, buf, 11);
735-
736-
// Small buffer size, initial buffer position is 0
737-
buf.clear();
738-
byteBufferPreadCheck(smallBuf, buf, 0);
739-
740-
// Small buffer size, initial buffer position is not 0
741-
buf.clear();
742-
byteBufferPreadCheck(smallBuf, buf, 11);
743-
744-
// Test with direct ByteBuffer
745-
buf = ByteBuffer.allocateDirect(dataLen + 100);
746-
747-
// Direct buffer, default buffer size, initial buffer position is 0
748-
byteBufferPreadCheck(defaultBuf, buf, 0);
749-
750-
// Direct buffer, default buffer size, initial buffer position is not 0
751-
buf.clear();
752-
byteBufferPreadCheck(defaultBuf, buf, 11);
753-
754-
// Direct buffer, small buffer size, initial buffer position is 0
755-
buf.clear();
756-
byteBufferPreadCheck(smallBuf, buf, 0);
757-
758-
// Direct buffer, small buffer size, initial buffer position is not 0
759-
buf.clear();
760-
byteBufferPreadCheck(smallBuf, buf, 11);
761-
}
762-
}
763616

764617
@Test(timeout=120000)
765618
public void testCombinedOp() throws Exception {
@@ -997,23 +850,5 @@ public void testUnbuffer() throws Exception {
997850
// The close will be called when exiting this try-with-resource block
998851
}
999852
}
1000-
1001-
// Test ByteBuffer pread
1002-
try (InputStream in = getInputStream(smallBufferSize)) {
1003-
if (in instanceof ByteBufferPositionedReadable) {
1004-
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
1005-
1006-
// Test unbuffer after pread
1007-
byteBufferPreadCheck(bbpin);
1008-
((CanUnbuffer) in).unbuffer();
1009-
1010-
// Test pread again after unbuffer
1011-
byteBufferPreadCheck(bbpin);
1012-
1013-
// Test close after unbuffer
1014-
((CanUnbuffer) in).unbuffer();
1015-
// The close will be called when exiting this try-with-resource block
1016-
}
1017-
}
1018853
}
1019854
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,6 @@ public boolean hasCapability(String capability) {
429429
case StreamCapabilities.READAHEAD:
430430
case StreamCapabilities.DROPBEHIND:
431431
case StreamCapabilities.UNBUFFER:
432-
case StreamCapabilities.READBYTEBUFFER:
433-
case StreamCapabilities.PREADBYTEBUFFER:
434432
return true;
435433
default:
436434
return false;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,32 +105,27 @@ public void testByteBufferReadFully() throws Exception {}
105105
@Override
106106
@Test(timeout=10000)
107107
public void testReadFully() throws IOException {}
108-
108+
109109
@Ignore("Wrapped stream doesn't support Seek")
110110
@Override
111111
@Test(timeout=10000)
112112
public void testSeek() throws IOException {}
113-
113+
114114
@Ignore("Wrapped stream doesn't support ByteBufferRead")
115115
@Override
116116
@Test(timeout=10000)
117117
public void testByteBufferRead() throws IOException {}
118-
119-
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
120-
@Override
121-
@Test(timeout=10000)
122-
public void testByteBufferPread() throws IOException {}
123-
118+
124119
@Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
125120
@Override
126121
@Test(timeout=10000)
127122
public void testCombinedOp() throws IOException {}
128-
123+
129124
@Ignore("Wrapped stream doesn't support SeekToNewSource")
130125
@Override
131126
@Test(timeout=10000)
132127
public void testSeekToNewSource() throws IOException {}
133-
128+
134129
@Ignore("Wrapped stream doesn't support HasEnhancedByteBufferAccess")
135130
@Override
136131
@Test(timeout=10000)
@@ -140,4 +135,4 @@ public void testHasEnhancedByteBufferAccess() throws IOException {}
140135
@Override
141136
@Test
142137
public void testUnbuffer() throws Exception {}
143-
}
138+
}

0 commit comments

Comments
 (0)