Skip to content

Commit fd24290

Browse files
committed
HADOOP-18028. High performance S3A input stream (apache#4109)
This is the the initial merge of the HADOOP-18028 S3A performance input stream. This patch on its own is incomplete and must be accompanied by all other commits with HADOOP-18028 in their git commit message. Consult the JIRA for that list Contributed by Bhalchandra Pandit.
1 parent 95a8587 commit fd24290

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+8331
-13
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,12 @@
468468
<artifactId>aws-java-sdk-bundle</artifactId>
469469
<scope>compile</scope>
470470
</dependency>
471+
<dependency>
472+
<groupId>com.twitter</groupId>
473+
<artifactId>util-core_2.11</artifactId>
474+
<version>21.2.0</version>
475+
<scope>compile</scope>
476+
</dependency>
471477
<dependency>
472478
<groupId>org.assertj</groupId>
473479
<artifactId>assertj-core</artifactId>
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.common;
21+
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
26+
/**
27+
* Provides functionality necessary for caching blocks of data read from FileSystem.
28+
*/
29+
public interface BlockCache extends Closeable {
30+
31+
/**
32+
* Indicates whether the given block is in this cache.
33+
*
34+
* @param blockNumber the id of the given block.
35+
* @return true if the given block is in this cache, false otherwise.
36+
*/
37+
boolean containsBlock(int blockNumber);
38+
39+
/**
40+
* Gets the blocks in this cache.
41+
*
42+
* @return the blocks in this cache.
43+
*/
44+
Iterable<Integer> blocks();
45+
46+
/**
47+
* Gets the number of blocks in this cache.
48+
*
49+
* @return the number of blocks in this cache.
50+
*/
51+
int size();
52+
53+
/**
54+
* Gets the block having the given {@code blockNumber}.
55+
*
56+
* @param blockNumber the id of the desired block.
57+
* @param buffer contents of the desired block are copied to this buffer.
58+
* @throws IOException if there is an error reading the given block.
59+
*/
60+
void get(int blockNumber, ByteBuffer buffer) throws IOException;
61+
62+
/**
63+
* Puts the given block in this cache.
64+
*
65+
* @param blockNumber the id of the given block.
66+
* @param buffer contents of the given block to be added to this cache.
67+
* @throws IOException if there is an error writing the given block.
68+
*/
69+
void put(int blockNumber, ByteBuffer buffer) throws IOException;
70+
}
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.common;
21+
22+
/**
23+
* Holds information about blocks of data in a file.
24+
*/
25+
public class BlockData {
26+
// State of each block of data.
27+
enum State {
28+
// Data is not yet ready to be read from this block (still being prefetched).
29+
NOT_READY,
30+
31+
// A read of this block has been enqueued in the prefetch queue.
32+
QUEUED,
33+
34+
// This block is ready to be read. That is, it has been fully read.
35+
READY,
36+
37+
// This block has been cached in the local disk cache.
38+
CACHED
39+
}
40+
41+
// State of all blocks in a file.
42+
private State[] state;
43+
44+
// The size of a file.
45+
private final long fileSize;
46+
47+
// The file is divided into blocks of this size.
48+
private final int blockSize;
49+
50+
// The file has these many blocks.
51+
private final int numBlocks;
52+
53+
/**
54+
* Constructs an instance of {@link BlockData}.
55+
*
56+
* @param fileSize the size of a file.
57+
* @param blockSize the file is divided into blocks of this size.
58+
*
59+
* @throws IllegalArgumentException if fileSize is negative.
60+
* @throws IllegalArgumentException if blockSize is negative.
61+
* @throws IllegalArgumentException if blockSize is zero or negative.
62+
*/
63+
public BlockData(long fileSize, int blockSize) {
64+
Validate.checkNotNegative(fileSize, "fileSize");
65+
if (fileSize == 0) {
66+
Validate.checkNotNegative(blockSize, "blockSize");
67+
} else {
68+
Validate.checkPositiveInteger(blockSize, "blockSize");
69+
}
70+
71+
this.fileSize = fileSize;
72+
this.blockSize = blockSize;
73+
this.numBlocks =
74+
(fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % blockSize > 0 ? 1 : 0);
75+
this.state = new State[this.numBlocks];
76+
for (int b = 0; b < this.numBlocks; b++) {
77+
this.setState(b, State.NOT_READY);
78+
}
79+
}
80+
81+
/**
82+
* Gets the size of each block.
83+
*
84+
* @return the size of each block.
85+
*/
86+
public int getBlockSize() {
87+
return this.blockSize;
88+
}
89+
90+
/**
91+
* Gets the size of the associated file.
92+
*
93+
* @return the size of the associated file.
94+
*/
95+
public long getFileSize() {
96+
return this.fileSize;
97+
}
98+
99+
/**
100+
* Gets the number of blocks in the associated file.
101+
*
102+
* @return the number of blocks in the associated file.
103+
*/
104+
public int getNumBlocks() {
105+
return this.numBlocks;
106+
}
107+
108+
/**
109+
* Indicates whether the given block is the last block in the associated file.
110+
*
111+
* @param blockNumber the id of the desired block.
112+
* @return true if the given block is the last block in the associated file, false otherwise.
113+
*
114+
* @throws IllegalArgumentException if blockNumber is invalid.
115+
*/
116+
public boolean isLastBlock(int blockNumber) {
117+
if (this.fileSize == 0) {
118+
return false;
119+
}
120+
121+
throwIfInvalidBlockNumber(blockNumber);
122+
123+
return blockNumber == (this.numBlocks - 1);
124+
}
125+
126+
/**
127+
* Gets the id of the block that contains the given absolute offset.
128+
*
129+
* @param offset the absolute offset to check.
130+
* @return the id of the block that contains the given absolute offset.
131+
*
132+
* @throws IllegalArgumentException if offset is invalid.
133+
*/
134+
public int getBlockNumber(long offset) {
135+
throwIfInvalidOffset(offset);
136+
137+
return (int) (offset / this.blockSize);
138+
}
139+
140+
/**
141+
* Gets the size of the given block.
142+
*
143+
* @param blockNumber the id of the desired block.
144+
* @return the size of the given block.
145+
*/
146+
public int getSize(int blockNumber) {
147+
if (this.fileSize == 0) {
148+
return 0;
149+
}
150+
151+
if (this.isLastBlock(blockNumber)) {
152+
return (int) (this.fileSize - (((long) this.blockSize) * (this.numBlocks - 1)));
153+
} else {
154+
return this.blockSize;
155+
}
156+
}
157+
158+
/**
159+
* Indicates whether the given absolute offset is valid.
160+
*
161+
* @param offset absolute offset in the file..
162+
* @return true if the given absolute offset is valid, false otherwise.
163+
*/
164+
public boolean isValidOffset(long offset) {
165+
return (offset >= 0) && (offset < this.fileSize);
166+
}
167+
168+
/**
169+
* Gets the start offset of the given block.
170+
171+
* @param blockNumber the id of the given block.
172+
* @return the start offset of the given block.
173+
*
174+
* @throws IllegalArgumentException if blockNumber is invalid.
175+
*/
176+
public long getStartOffset(int blockNumber) {
177+
throwIfInvalidBlockNumber(blockNumber);
178+
179+
return blockNumber * (long) this.blockSize;
180+
}
181+
182+
/**
183+
* Gets the relative offset corresponding to the given block and the absolute offset.
184+
*
185+
* @param blockNumber the id of the given block.
186+
* @param offset absolute offset in the file.
187+
* @return the relative offset corresponding to the given block and the absolute offset.
188+
*
189+
* @throws IllegalArgumentException if either blockNumber or offset is invalid.
190+
*/
191+
public int getRelativeOffset(int blockNumber, long offset) {
192+
throwIfInvalidOffset(offset);
193+
194+
return (int) (offset - this.getStartOffset(blockNumber));
195+
}
196+
197+
/**
198+
* Gets the state of the given block.
199+
*
200+
* @param blockNumber the id of the given block.
201+
* @return the state of the given block.
202+
*
203+
* @throws IllegalArgumentException if blockNumber is invalid.
204+
*/
205+
public State getState(int blockNumber) {
206+
throwIfInvalidBlockNumber(blockNumber);
207+
208+
return this.state[blockNumber];
209+
}
210+
211+
/**
212+
* Sets the state of the given block to the given value.
213+
*
214+
* @param blockNumber the id of the given block.
215+
* @param blockState the target state.
216+
*
217+
* @throws IllegalArgumentException if blockNumber is invalid.
218+
*/
219+
public void setState(int blockNumber, State blockState) {
220+
throwIfInvalidBlockNumber(blockNumber);
221+
222+
this.state[blockNumber] = blockState;
223+
}
224+
225+
// Debug helper.
226+
public String getStateString() {
227+
StringBuilder sb = new StringBuilder();
228+
int blockNumber = 0;
229+
while (blockNumber < this.numBlocks) {
230+
State tstate = this.getState(blockNumber);
231+
int endBlockNumber = blockNumber;
232+
while ((endBlockNumber < this.numBlocks) && (this.getState(endBlockNumber) == tstate)) {
233+
endBlockNumber++;
234+
}
235+
sb.append(String.format("[%03d ~ %03d] %s%n", blockNumber, endBlockNumber - 1, tstate));
236+
blockNumber = endBlockNumber;
237+
}
238+
return sb.toString();
239+
}
240+
241+
private void throwIfInvalidBlockNumber(int blockNumber) {
242+
Validate.checkWithinRange(blockNumber, "blockNumber", 0, this.numBlocks - 1);
243+
}
244+
245+
private void throwIfInvalidOffset(long offset) {
246+
Validate.checkWithinRange(offset, "offset", 0, this.fileSize - 1);
247+
}
248+
}

0 commit comments

Comments
 (0)