diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java index 51e9f8340fcb..03b726c85939 100644 --- a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java +++ b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java @@ -18,6 +18,8 @@ package org.apache.paimon.compression; +import java.util.Arrays; + /** Block Compression type. */ public enum BlockCompressionType { NONE(0), @@ -45,4 +47,16 @@ public static BlockCompressionType getCompressionTypeByPersistentId(int persiste throw new IllegalArgumentException("Unknown persistentId " + persistentId); } + + public static BlockCompressionType getCompressionTypeByValue(String value) { + BlockCompressionType[] types = values(); + for (BlockCompressionType type : types) { + if (type.name().equalsIgnoreCase(value)) { + return type; + } + } + + throw new IllegalArgumentException( + "Unknown type " + value + ", options are: " + Arrays.toString(types)); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java index 30c93c30b290..0dd9385432be 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java @@ -150,11 +150,11 @@ public byte[] serializeToBytes(InternalRow record) { return rowWriter.copyBuffer(); } - public InternalRow deserialize(byte[] bytes) { + public InternalRow deserialize(MemorySlice memorySlice) { if (rowReader == null) { rowReader = new RowReader(calculateBitSetInBytes(getters.length)); } - rowReader.pointTo(bytes); + rowReader.pointTo(memorySlice.segment(), memorySlice.offset()); GenericRow row = new GenericRow(readers.length); row.setRowKind(rowReader.readRowKind()); for (int i = 0; i < readers.length; i++) { @@ -163,6 +163,10 @@ public InternalRow deserialize(byte[] bytes) { return row; } + public InternalRow deserialize(byte[] bytes) { + return deserialize(MemorySlice.wrap(bytes)); + } + public Comparator createSliceComparator() { return new SliceComparator(rowType); } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index b6c7d4ee5850..4287bc5f038d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -74,9 +74,31 @@ public FormatReaderFactory createReaderFactory( return createReaderFactory(dataSchemaRowType, projectedRowType, filters); } + /** + * Same as {@link FileFormat#createReaderFactory(RowType, RowType, List)}, but for file formats + * which need to store keys and values separately. + */ + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters, + RowType keyType, + RowType valueType) { + return createReaderFactory(dataSchemaRowType, projectedRowType, filters); + } + /** Create a {@link FormatWriterFactory} from the type. */ public abstract FormatWriterFactory createWriterFactory(RowType type); + /** + * Same as {@link FileFormat#createWriterFactory(RowType)}, but for file formats which need to + * store keys and values separately. + */ + public FormatWriterFactory createWriterFactory( + RowType type, RowType keyType, RowType valueType) { + return createWriterFactory(type); + } + /** Validate data field type supported or not. */ public abstract void validateDataFields(RowType rowType); diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java index 11b8beb22c55..86bb6b5d32b1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java @@ -18,6 +18,8 @@ package org.apache.paimon.io.cache; +import org.apache.paimon.fs.Path; + import java.io.RandomAccessFile; import java.util.Objects; @@ -28,6 +30,10 @@ static CacheKey forPosition(RandomAccessFile file, long position, int length, bo return new PositionCacheKey(file, position, length, isIndex); } + static CacheKey forPosition(Path path, long position, int length, boolean isIndex) { + return new PathPositionCacheKey(path, position, length, isIndex); + } + static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) { return new PageIndexCacheKey(file, pageSize, pageIndex, false); } @@ -122,4 +128,45 @@ public int hashCode() { return Objects.hash(file, pageSize, pageIndex, isIndex); } } + + /** The {@link CacheKey} for a remote file position and length. */ + class PathPositionCacheKey implements CacheKey { + + private final Path remotePath; + private final long position; + private final int length; + private final boolean isIndex; + + private PathPositionCacheKey(Path remotePath, long position, int length, boolean isIndex) { + this.remotePath = remotePath; + this.position = position; + this.length = length; + this.isIndex = isIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PathPositionCacheKey that = (PathPositionCacheKey) o; + return position == that.position + && length == that.length + && isIndex == that.isIndex + && Objects.equals(remotePath, that.remotePath); + } + + @Override + public int hashCode() { + return Objects.hash(remotePath, position, length, isIndex); + } + + @Override + public boolean isIndex() { + return isIndex; + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java deleted file mode 100644 index b8ae51789189..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.sort; - -import org.apache.paimon.memory.MemorySlice; -import org.apache.paimon.memory.MemorySliceInput; -import org.apache.paimon.memory.MemorySliceOutput; - -import javax.annotation.Nullable; - -import java.io.IOException; - -import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER; -import static org.apache.paimon.utils.Preconditions.checkArgument; - -/** Footer for a sorted file. */ -public class Footer { - - public static final int ENCODED_LENGTH = 36; - - @Nullable private final BloomFilterHandle bloomFilterHandle; - private final BlockHandle indexBlockHandle; - - Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle indexBlockHandle) { - this.bloomFilterHandle = bloomFilterHandle; - this.indexBlockHandle = indexBlockHandle; - } - - @Nullable - public BloomFilterHandle getBloomFilterHandle() { - return bloomFilterHandle; - } - - public BlockHandle getIndexBlockHandle() { - return indexBlockHandle; - } - - public static Footer readFooter(MemorySliceInput sliceInput) throws IOException { - // read bloom filter and index handles - @Nullable - BloomFilterHandle bloomFilterHandle = - new BloomFilterHandle( - sliceInput.readLong(), sliceInput.readInt(), sliceInput.readLong()); - if (bloomFilterHandle.offset() == 0 - && bloomFilterHandle.size() == 0 - && bloomFilterHandle.expectedEntries() == 0) { - bloomFilterHandle = null; - } - BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(), sliceInput.readInt()); - - // skip padding - sliceInput.setPosition(ENCODED_LENGTH - 4); - - // verify magic number - int magicNumber = sliceInput.readInt(); - checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad magic number)"); - - return new Footer(bloomFilterHandle, indexBlockHandle); - } - - public static MemorySlice writeFooter(Footer footer) { - MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH); - writeFooter(footer, output); - return output.toSlice(); - } - - public static void writeFooter(Footer footer, MemorySliceOutput sliceOutput) { - // write bloom filter and index handles - if (footer.bloomFilterHandle == null) { - sliceOutput.writeLong(0); - sliceOutput.writeInt(0); - sliceOutput.writeLong(0); - } else { - sliceOutput.writeLong(footer.bloomFilterHandle.offset()); - sliceOutput.writeInt(footer.bloomFilterHandle.size()); - sliceOutput.writeLong(footer.bloomFilterHandle.expectedEntries()); - } - - sliceOutput.writeLong(footer.indexBlockHandle.offset()); - sliceOutput.writeInt(footer.indexBlockHandle.size()); - - // write magic number - sliceOutput.writeInt(MAGIC_NUMBER); - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index fd068baaa27b..0661cc97cec3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -18,16 +18,15 @@ package org.apache.paimon.lookup.sort; -import org.apache.paimon.compression.BlockCompressionFactory; -import org.apache.paimon.compression.BlockDecompressor; -import org.apache.paimon.io.PageFileInput; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.LookupStoreReader; -import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySlice; -import org.apache.paimon.memory.MemorySliceInput; -import org.apache.paimon.utils.FileBasedBloomFilter; -import org.apache.paimon.utils.MurmurHashUtils; +import org.apache.paimon.sst.BlockCache; +import org.apache.paimon.sst.SstFileLookupReader; import javax.annotation.Nullable; @@ -35,142 +34,39 @@ import java.io.IOException; import java.util.Comparator; -import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** A {@link LookupStoreReader} for sort store. */ public class SortLookupStoreReader implements LookupStoreReader { - private final Comparator comparator; - private final String filePath; - private final long fileSize; - - private final BlockIterator indexBlockIterator; - @Nullable private FileBasedBloomFilter bloomFilter; - private final BlockCache blockCache; - private final PageFileInput fileInput; + private final SstFileLookupReader sstFileLookupReader; + private final FileIO fileIO; + private final SeekableInputStream inputStream; public SortLookupStoreReader( Comparator comparator, File file, int blockSize, CacheManager cacheManager) throws IOException { - this.comparator = comparator; - this.filePath = file.getAbsolutePath(); - this.fileSize = file.length(); - - this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null); - this.blockCache = new BlockCache(fileInput.file(), cacheManager); - Footer footer = readFooter(); - this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator(); - BloomFilterHandle handle = footer.getBloomFilterHandle(); - if (handle != null) { - this.bloomFilter = - new FileBasedBloomFilter( - fileInput, - cacheManager, - handle.expectedEntries(), - handle.offset(), - handle.size()); - } - } - - private Footer readFooter() throws IOException { - MemorySegment footerData = - blockCache.getBlock( - fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true); - return Footer.readFooter(MemorySlice.wrap(footerData).toInput()); + final Path filePath = new Path(file.getAbsolutePath()); + + this.fileIO = LocalFileIO.create(); + this.inputStream = fileIO.newInputStream(filePath); + this.sstFileLookupReader = + new SstFileLookupReader( + inputStream, + comparator, + file.length(), + filePath, + new BlockCache(filePath, inputStream, cacheManager)); } @Nullable @Override public byte[] lookup(byte[] key) throws IOException { - if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) { - return null; - } - - MemorySlice keySlice = MemorySlice.wrap(key); - // seek the index to the block containing the key - indexBlockIterator.seekTo(keySlice); - - // if indexIterator does not have a next, it means the key does not exist in this iterator - if (indexBlockIterator.hasNext()) { - // seek the current iterator to the key - BlockIterator current = getNextBlock(); - if (current.seekTo(keySlice)) { - return current.next().getValue().copyBytes(); - } - } - return null; - } - - private BlockIterator getNextBlock() { - // index block handle, point to the key, value position. - MemorySlice blockHandle = indexBlockIterator.next().getValue(); - BlockReader dataBlock = - readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), false); - return dataBlock.iterator(); - } - - /** - * @param blockHandle The block handle. - * @param index Whether read the block as an index. - * @return The reader of the target block. - */ - private BlockReader readBlock(BlockHandle blockHandle, boolean index) { - // read block trailer - MemorySegment trailerData = - blockCache.getBlock( - blockHandle.offset() + blockHandle.size(), - BlockTrailer.ENCODED_LENGTH, - b -> b, - true); - BlockTrailer blockTrailer = - BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput()); - - MemorySegment unCompressedBlock = - blockCache.getBlock( - blockHandle.offset(), - blockHandle.size(), - bytes -> decompressBlock(bytes, blockTrailer), - index); - return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator); - } - - private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) { - MemorySegment compressed = MemorySegment.wrap(compressedBytes); - int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType()); - checkArgument( - blockTrailer.getCrc32c() == crc32cCode, - String.format( - "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", - blockTrailer.getCrc32c(), crc32cCode, filePath)); - - // decompress data - BlockCompressionFactory compressionFactory = - BlockCompressionFactory.create(blockTrailer.getCompressionType()); - if (compressionFactory == null) { - return compressedBytes; - } else { - MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput(); - byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; - BlockDecompressor decompressor = compressionFactory.getDecompressor(); - int uncompressedLength = - decompressor.decompress( - compressed.getHeapMemory(), - compressedInput.position(), - compressedInput.available(), - uncompressed, - 0); - checkArgument(uncompressedLength == uncompressed.length); - return uncompressed; - } + return sstFileLookupReader.lookup(key); } @Override public void close() throws IOException { - if (bloomFilter != null) { - bloomFilter.close(); - } - blockCache.close(); - fileInput.close(); + sstFileLookupReader.close(); + inputStream.close(); + fileIO.close(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java index 31f09cfbc65e..eec936e864a9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java @@ -19,185 +19,47 @@ package org.apache.paimon.lookup.sort; import org.apache.paimon.compression.BlockCompressionFactory; -import org.apache.paimon.compression.BlockCompressionType; -import org.apache.paimon.compression.BlockCompressor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.lookup.LookupStoreWriter; -import org.apache.paimon.memory.MemorySegment; -import org.apache.paimon.memory.MemorySlice; -import org.apache.paimon.options.MemorySize; +import org.apache.paimon.sst.SstFileWriter; import org.apache.paimon.utils.BloomFilter; -import org.apache.paimon.utils.MurmurHashUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; -import java.nio.file.Files; - -import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle; -import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c; -import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes; -import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt; /** A {@link LookupStoreWriter} for sorting. */ public class SortLookupStoreWriter implements LookupStoreWriter { + private final SstFileWriter sstFileWriter; + private final FileIO fileIO; + private final PositionOutputStream out; - private static final Logger LOG = - LoggerFactory.getLogger(SortLookupStoreWriter.class.getName()); - - public static final int MAGIC_NUMBER = 1481571681; - - private final BufferedOutputStream fileOutputStream; - private final int blockSize; - private final BlockWriter dataBlockWriter; - private final BlockWriter indexBlockWriter; - @Nullable private final BloomFilter.Builder bloomFilter; - private final BlockCompressionType compressionType; - @Nullable private final BlockCompressor blockCompressor; - - private byte[] lastKey; - private long position; - - private long recordCount; - private long totalUncompressedSize; - private long totalCompressedSize; - - SortLookupStoreWriter( + public SortLookupStoreWriter( File file, int blockSize, @Nullable BloomFilter.Builder bloomFilter, @Nullable BlockCompressionFactory compressionFactory) throws IOException { - this.fileOutputStream = new BufferedOutputStream(Files.newOutputStream(file.toPath())); - this.blockSize = blockSize; - this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1)); - int expectedNumberOfBlocks = 1024; - this.indexBlockWriter = - new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH * expectedNumberOfBlocks); - this.bloomFilter = bloomFilter; - if (compressionFactory == null) { - this.compressionType = BlockCompressionType.NONE; - this.blockCompressor = null; - } else { - this.compressionType = compressionFactory.getCompressionType(); - this.blockCompressor = compressionFactory.getCompressor(); - } + Path filePath = new Path(file.getAbsolutePath()); + + this.fileIO = LocalFileIO.create(); + this.out = fileIO.newOutputStream(filePath, true); + this.sstFileWriter = new SstFileWriter(out, blockSize, bloomFilter, compressionFactory); } @Override public void put(byte[] key, byte[] value) throws IOException { - dataBlockWriter.add(key, value); - if (bloomFilter != null) { - bloomFilter.addHash(MurmurHashUtils.hashBytes(key)); - } - - lastKey = key; - - if (dataBlockWriter.memory() > blockSize) { - flush(); - } - - recordCount++; - } - - private void flush() throws IOException { - if (dataBlockWriter.size() == 0) { - return; - } - - BlockHandle blockHandle = writeBlock(dataBlockWriter); - MemorySlice handleEncoding = writeBlockHandle(blockHandle); - indexBlockWriter.add(lastKey, handleEncoding.copyBytes()); - } - - private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException { - // close the block - MemorySlice block = blockWriter.finish(); - - totalUncompressedSize += block.length(); - - // attempt to compress the block - BlockCompressionType blockCompressionType = BlockCompressionType.NONE; - if (blockCompressor != null) { - int maxCompressedSize = blockCompressor.getMaxCompressedSize(block.length()); - byte[] compressed = allocateReuseBytes(maxCompressedSize + 5); - int offset = encodeInt(compressed, 0, block.length()); - int compressedSize = - offset - + blockCompressor.compress( - block.getHeapMemory(), - block.offset(), - block.length(), - compressed, - offset); - - // Don't use the compressed data if compressed less than 12.5%, - if (compressedSize < block.length() - (block.length() / 8)) { - block = new MemorySlice(MemorySegment.wrap(compressed), 0, compressedSize); - blockCompressionType = this.compressionType; - } - } - - totalCompressedSize += block.length(); - - // create block trailer - BlockTrailer blockTrailer = - new BlockTrailer(blockCompressionType, crc32c(block, blockCompressionType)); - MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer); - - // create a handle to this block - BlockHandle blockHandle = new BlockHandle(position, block.length()); - - // write data - writeSlice(block); - - // write trailer: 5 bytes - writeSlice(trailer); - - // clean up state - blockWriter.reset(); - - return blockHandle; + this.sstFileWriter.put(key, value); } @Override public void close() throws IOException { - // flush current data block - flush(); - - LOG.info("Number of record: {}", recordCount); - - // write bloom filter - @Nullable BloomFilterHandle bloomFilterHandle = null; - if (bloomFilter != null) { - MemorySegment buffer = bloomFilter.getBuffer(); - bloomFilterHandle = - new BloomFilterHandle(position, buffer.size(), bloomFilter.expectedEntries()); - writeSlice(MemorySlice.wrap(buffer)); - LOG.info("Bloom filter size: {} bytes", bloomFilter.getBuffer().size()); - } - - // write index block - BlockHandle indexBlockHandle = writeBlock(indexBlockWriter); - - // write footer - Footer footer = new Footer(bloomFilterHandle, indexBlockHandle); - MemorySlice footerEncoding = Footer.writeFooter(footer); - writeSlice(footerEncoding); - - // close file - fileOutputStream.close(); - - LOG.info("totalUncompressedSize: {}", MemorySize.ofBytes(totalUncompressedSize)); - LOG.info("totalCompressedSize: {}", MemorySize.ofBytes(totalCompressedSize)); - } - - private void writeSlice(MemorySlice slice) throws IOException { - fileOutputStream.write(slice.getHeapMemory(), slice.offset(), slice.length()); - position += slice.length(); + sstFileWriter.close(); + out.close(); + fileIO.close(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java index 4b9cd8c044a5..edd3ed1288ba 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java @@ -99,4 +99,8 @@ public MemorySlice readSlice(int length) { position += length; return newSlice; } + + public MemorySlice getSlice() { + return slice; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/AbstractSstFileReader.java b/paimon-common/src/main/java/org/apache/paimon/sst/AbstractSstFileReader.java new file mode 100644 index 000000000000..6cd1c42aac85 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/AbstractSstFileReader.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.BlockDecompressor; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.memory.MemorySliceInput; +import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Comparator; +import java.util.function.Function; + +import static org.apache.paimon.sst.SstFileUtils.crc32c; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * An SST FileReader which serves point queries and range queries. The implementation directly + * processes bytes. + * + *

Note that this class is NOT thread-safe. + */ +public abstract class AbstractSstFileReader implements Closeable { + + private final long fileSize; + private final FileInfo fileInfo; + private final SeekableInputStream input; + private final BlockCompressionFactory compressionFactory; + + protected final Comparator comparator; + protected final Path filePath; + protected final Footer footer; + protected final MemorySlice firstKey; + protected final BlockIterator indexBlockIterator; + @Nullable protected final BloomFilter bloomFilter; + /** Block cache can be useful in lookup or sequential scan with pre-fetch. */ + @Nullable protected final BlockCache blockCache; + + public AbstractSstFileReader( + SeekableInputStream input, + Comparator comparator, + long fileSize, + Path filePath, + @Nullable BlockCache blockCache) + throws IOException { + this.comparator = comparator; + this.fileSize = fileSize; + this.filePath = filePath; + this.input = input; + this.blockCache = blockCache; + + Footer footer = + Footer.readFooter( + readSlice(fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, true) + .toInput()); + this.footer = footer; + this.compressionFactory = BlockCompressionFactory.create(footer.getCompressionType()); + this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator(); + this.bloomFilter = readBloomFilter(footer.getBloomFilterHandle()); + BlockHandle fileInfoHandle = footer.getFileInfoHandle(); + this.fileInfo = + FileInfo.readFileInfo( + readSlice(fileInfoHandle.offset(), fileInfoHandle.size(), true).toInput()); + this.firstKey = + this.fileInfo.getFirstKey() == null + ? null + : MemorySlice.wrap(this.fileInfo.getFirstKey()); + } + + private BloomFilter readBloomFilter(BloomFilterHandle bloomFilterHandle) throws IOException { + // todo: replace with `FileBasedBloomFilter` to refresh cache on visit + BloomFilter bloomFilter = null; + if (bloomFilterHandle != null) { + MemorySegment memorySegment = + readSlice(bloomFilterHandle.offset(), bloomFilterHandle.size(), true).segment(); + bloomFilter = + new BloomFilter(bloomFilterHandle.expectedEntries(), memorySegment.size()); + bloomFilter.setMemorySegment(memorySegment, 0); + } + return bloomFilter; + } + + protected BlockIterator getNextBlock(BlockIterator indexBlockIterator) throws IOException { + IndexEntryHandle entryHandle = + IndexEntryHandle.read(indexBlockIterator.next().getValue().toInput()); + BlockReader dataBlock = + readBlock(new BlockHandle(entryHandle.getOffset(), entryHandle.getSize())); + return dataBlock.iterator(); + } + + private MemorySlice readSlice(long offset, int size, boolean isIndex) throws IOException { + return readSlice(offset, size, Function.identity(), isIndex); + } + + private MemorySlice readSlice( + long offset, int size, Function decompressFunc, boolean isIndex) + throws IOException { + if (blockCache != null) { + return MemorySlice.wrap(blockCache.getBlock(offset, size, decompressFunc, isIndex)); + } + input.seek(offset); + byte[] sliceBytes = new byte[size]; + IOUtils.readFully(input, sliceBytes); + return MemorySlice.wrap(decompressFunc.apply(sliceBytes)); + } + + protected BlockReader readBlock(BlockHandle blockHandle) throws IOException { + // todo: reuse dataBlock in scan + // 1. read trailer + MemorySlice trailerSlice = + readSlice( + blockHandle.offset() + blockHandle.size(), + BlockTrailer.ENCODED_LENGTH, + true); + BlockTrailer blockTrailer = BlockTrailer.readBlockTrailer(trailerSlice.toInput()); + // 2. read block + MemorySlice blockSlice = + readSlice( + blockHandle.offset(), + blockHandle.size(), + data -> decompressBlock(data, blockTrailer), + false); + return new BlockReader(blockSlice, comparator); + } + + private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) { + MemorySegment compressed = MemorySegment.wrap(compressedBytes); + int crc32cCode = crc32c(compressed); + checkArgument( + blockTrailer.getCrc32c() == crc32cCode, + String.format( + "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", + blockTrailer.getCrc32c(), crc32cCode, filePath)); + + if (compressionFactory == null) { + return compressedBytes; + } else { + MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput(); + byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; + BlockDecompressor decompressor = compressionFactory.getDecompressor(); + int uncompressedLength = + decompressor.decompress( + compressed.getHeapMemory(), + compressedInput.position(), + compressedInput.available(), + uncompressed, + 0); + checkArgument(uncompressedLength == uncompressed.length); + return uncompressed; + } + } + + @Override + public void close() throws IOException { + if (blockCache != null) { + blockCache.close(); + } + } + + @VisibleForTesting + public BlockIterator getIndexBlockIterator() { + return indexBlockIterator; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java similarity index 97% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java index e5849d9f750d..43387c9d631e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockAlignedType.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; /** Aligned type for block. */ public enum BlockAlignedType { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java similarity index 80% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java index 0441a24f220e..1fd895f9a6b6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java @@ -16,18 +16,18 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.io.cache.CacheKey; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.io.cache.CacheManager.SegmentContainer; import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.utils.IOUtils; import java.io.Closeable; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -37,32 +37,29 @@ /** Cache for block reading. */ public class BlockCache implements Closeable { - private final RandomAccessFile file; - private final FileChannel channel; + private final SeekableInputStream input; private final CacheManager cacheManager; private final Map blocks; + private final Path sstFilePath; - public BlockCache(RandomAccessFile file, CacheManager cacheManager) { - this.file = file; - this.channel = this.file.getChannel(); + public BlockCache(Path sstFilePath, SeekableInputStream input, CacheManager cacheManager) { + this.input = input; this.cacheManager = cacheManager; this.blocks = new HashMap<>(); + this.sstFilePath = sstFilePath; } private byte[] readFrom(long offset, int length) throws IOException { byte[] buffer = new byte[length]; - int read = channel.read(ByteBuffer.wrap(buffer), offset); - - if (read != length) { - throw new IOException("Could not read all the data"); - } + input.seek(offset); + IOUtils.readFully(input, buffer, 0, length); return buffer; } public MemorySegment getBlock( long position, int length, Function decompressFunc, boolean isIndex) { - CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex); + CacheKey cacheKey = CacheKey.forPosition(sstFilePath, position, length, isIndex); SegmentContainer container = blocks.get(cacheKey); if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java similarity index 98% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java index 4473886e3167..efb82e9fb689 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockEntry.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import org.apache.paimon.memory.MemorySlice; diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java similarity index 98% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java index 737f57a8b018..60d4c5929cf4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockHandle.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceInput; diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java similarity index 50% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java index c5647dc50b25..dfa82e285556 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockIterator.java @@ -16,27 +16,27 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceInput; import java.util.Comparator; import java.util.Iterator; -import java.util.Map; import java.util.NoSuchElementException; +import java.util.function.Function; import static java.util.Objects.requireNonNull; /** An {@link Iterator} for a block. */ -public abstract class BlockIterator implements Iterator> { +public abstract class BlockIterator implements Iterator { protected final MemorySliceInput data; private final int recordCount; - private final Comparator comparator; + private int recordPosition = 0; - private BlockEntry polled; + protected final Comparator comparator; public BlockIterator( MemorySliceInput data, int recordCount, Comparator comparator) { @@ -47,7 +47,7 @@ public BlockIterator( @Override public boolean hasNext() { - return polled != null || data.isReadable(); + return data.isReadable() && recordPosition < recordCount; } @Override @@ -56,12 +56,6 @@ public BlockEntry next() { throw new NoSuchElementException(); } - if (polled != null) { - BlockEntry result = polled; - polled = null; - return result; - } - return readEntry(); } @@ -70,32 +64,88 @@ public void remove() { throw new UnsupportedOperationException(); } + /** + * Seek to the position of the target key. See {@link BlockIterator#seekTo(Object, Comparator, + * Function) seekTo}. + */ public boolean seekTo(MemorySlice targetKey) { + return seekTo(targetKey, comparator, BlockEntry::getKey); + } + + /** + * Seeks to the first entry with a value that is either equal to or greater than the specified + * value. After this call, the next invocation of {@link BlockIterator#next()} will return that + * entry (if it exists). + * + *

Note that the comparing value must be monotonically increasing across current block e.g. + * key and some special values such as the {@code lastRecordPosition} of an {@code + * IndexBlockEntry}. + * + * @param targetValue target value + * @param valueComparator comparator to compare value + * @param valueExtractor extractor to extract a compared value from an entry + * @return true if found an equal record + */ + public boolean seekTo( + T targetValue, Comparator valueComparator, Function valueExtractor) { int left = 0; int right = recordCount - 1; + int mid = recordCount; while (left <= right) { - int mid = left + (right - left) / 2; + mid = left + (right - left) / 2; seekTo(mid); BlockEntry midEntry = readEntry(); - int compare = comparator.compare(midEntry.getKey(), targetKey); + int compare = valueComparator.compare(valueExtractor.apply(midEntry), targetValue); if (compare == 0) { - polled = midEntry; - return true; + break; } else if (compare > 0) { - polled = midEntry; right = mid - 1; } else { left = mid + 1; } } - return false; + // left <= right means we found an equal key + boolean equal = left <= right; + int targetPos = equal ? mid : left; + + if (targetPos >= recordCount) { + moveToEnd(); + } else { + seekTo(targetPos); + } + + return equal; + } + + private void moveToEnd() { + data.setPosition(data.getSlice().length()); + recordPosition = recordCount; + } + + public int getRecordCount() { + return recordCount; + } + + public int getRecordPosition() { + return recordPosition; + } + + /** + * Seek to the record position. This operation is quite lightweight, only setting underlying + * data's position. + * + * @param recordPosition target record position + */ + public void seekTo(int recordPosition) { + this.recordPosition = recordPosition; + innerSeekTo(recordPosition); } - public abstract void seekTo(int record); + public abstract void innerSeekTo(int recordPosition); private BlockEntry readEntry() { requireNonNull(data, "data is null"); @@ -107,6 +157,8 @@ private BlockEntry readEntry() { int valueLength = data.readVarLenInt(); MemorySlice value = data.readSlice(valueLength); + recordPosition++; + return new BlockEntry(key, value); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java similarity index 93% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java index 544a3d42b69b..06aad751e410 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockReader.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import org.apache.paimon.memory.MemorySlice; import java.util.Comparator; -import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED; +import static org.apache.paimon.sst.BlockAlignedType.ALIGNED; /** Reader for a block. */ public class BlockReader { @@ -64,7 +64,7 @@ public AlignedIterator( } @Override - public void seekTo(int record) { + public void innerSeekTo(int record) { data.setPosition(record * recordSize); } } @@ -80,7 +80,7 @@ public UnalignedIterator( } @Override - public void seekTo(int record) { + public void innerSeekTo(int record) { data.setPosition(index.readInt(record * 4)); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java similarity index 61% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java index 6d49bd9cc543..272e1761aa4b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockTrailer.java @@ -16,33 +16,24 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; -import org.apache.paimon.compression.BlockCompressionType; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceInput; import org.apache.paimon.memory.MemorySliceOutput; -import static java.util.Objects.requireNonNull; +import java.util.Objects; /** Trailer of a block. */ public class BlockTrailer { - public static final int ENCODED_LENGTH = 5; + public static final int ENCODED_LENGTH = 4; - private final BlockCompressionType compressionType; private final int crc32c; - public BlockTrailer(BlockCompressionType compressionType, int crc32c) { - requireNonNull(compressionType, "compressionType is null"); - - this.compressionType = compressionType; + public BlockTrailer(int crc32c) { this.crc32c = crc32c; } - public BlockCompressionType getCompressionType() { - return compressionType; - } - public int getCrc32c() { return crc32c; } @@ -57,34 +48,22 @@ public boolean equals(Object o) { } BlockTrailer that = (BlockTrailer) o; - if (crc32c != that.crc32c) { - return false; - } - return compressionType == that.compressionType; + return crc32c == that.crc32c; } @Override public int hashCode() { - int result = compressionType.hashCode(); - result = 31 * result + crc32c; - return result; + return Objects.hash(crc32c); } @Override public String toString() { - return "BlockTrailer" - + "{compressionType=" - + compressionType - + ", crc32c=0x" - + Integer.toHexString(crc32c) - + '}'; + return "BlockTrailer" + "{crc32c=0x" + Integer.toHexString(crc32c) + '}'; } public static BlockTrailer readBlockTrailer(MemorySliceInput input) { - BlockCompressionType compressionType = - BlockCompressionType.getCompressionTypeByPersistentId(input.readUnsignedByte()); int crc32c = input.readInt(); - return new BlockTrailer(compressionType, crc32c); + return new BlockTrailer(crc32c); } public static MemorySlice writeBlockTrailer(BlockTrailer blockTrailer) { @@ -94,7 +73,6 @@ public static MemorySlice writeBlockTrailer(BlockTrailer blockTrailer) { } public static void writeBlockTrailer(BlockTrailer blockTrailer, MemorySliceOutput sliceOutput) { - sliceOutput.writeByte(blockTrailer.getCompressionType().persistentId()); sliceOutput.writeInt(blockTrailer.getCrc32c()); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java similarity index 69% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java index 340abac8620c..a098a7474f88 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceOutput; @@ -24,12 +24,35 @@ import java.io.IOException; -import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED; -import static org.apache.paimon.lookup.sort.BlockAlignedType.UNALIGNED; +import static org.apache.paimon.sst.BlockAlignedType.ALIGNED; +import static org.apache.paimon.sst.BlockAlignedType.UNALIGNED; -/** Writer to build a Block. */ +/** + * Writer to build a Block. A block is designed for storing and random-accessing k-v pairs. The + * layout is as below: + * + *

+ *     +---------------+
+ *     | Block Trailer |
+ *     +------------------------------------------------+
+ *     |                  Block CRC23C                  |
+ *     +------------------------------------------------+
+ *     +---------------+
+ *     |  Block Data   |
+ *     +---------------+--------------------------------+----+
+ *     | key len | key bytes | value len | value bytes  |    |
+ *     +------------------------------------------------+    |
+ *     | key len | key bytes | value len | value bytes  |    +-> Key-Value pairs
+ *     +------------------------------------------------+    |
+ *     |                  ... ...                       |    |
+ *     +------------------------------------------------+----+
+ *     | entry pos | entry pos |     ...     | entry pos|    +-> optional, for unaligned block
+ *     +------------------------------------------------+----+
+ *     |   entry num  /  entry size   |   aligned type  |
+ *     +------------------------------------------------+
+ * 
+ */ public class BlockWriter { - private final IntArrayList positions; private final MemorySliceOutput block; diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java similarity index 98% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java rename to paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java index 7ec6a845c525..7f3804dd8d41 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; import java.util.Objects; diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/FileInfo.java b/paimon-common/src/main/java/org/apache/paimon/sst/FileInfo.java new file mode 100644 index 000000000000..413fa9df4657 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/FileInfo.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.memory.MemorySliceInput; +import org.apache.paimon.memory.MemorySliceOutput; + +import java.util.Arrays; + +/** Contains some important information of an SST file. */ +public class FileInfo { + private static final int FIXED_PART_LENGTH = 16; + private byte[] firstKey = null; + private int avgKeyLength = 0; + private int avgValueLength = 0; + private int maxKeyLength = -1; + private int maxValueLength = -1; + + public FileInfo( + byte[] firstKey, + int avgKeyLength, + int avgValueLength, + int maxKeyLength, + int maxValueLength) { + this.firstKey = firstKey; + this.avgKeyLength = avgKeyLength; + this.avgValueLength = avgValueLength; + this.maxKeyLength = maxKeyLength; + this.maxValueLength = maxValueLength; + } + + public int getMaxKeyLength() { + return maxKeyLength; + } + + public int getMaxValueLength() { + return maxValueLength; + } + + public long getAvgKeyLength() { + return avgKeyLength; + } + + public long getAvgValueLength() { + return avgValueLength; + } + + public byte[] getFirstKey() { + return firstKey; + } + + public int memorySize() { + return FIXED_PART_LENGTH + (firstKey == null ? 0 : firstKey.length); + } + + public static FileInfo readFileInfo(MemorySliceInput sliceInput) { + byte[] firstKey = null; + int firstKeyLength = sliceInput.readInt(); + if (firstKeyLength > 0) { + firstKey = sliceInput.readSlice(firstKeyLength).copyBytes(); + } + return new FileInfo( + firstKey, + sliceInput.readInt(), + sliceInput.readInt(), + sliceInput.readInt(), + sliceInput.readInt()); + } + + public static MemorySlice writeFileInfo(FileInfo fileInfo) { + MemorySliceOutput memorySlice = new MemorySliceOutput(fileInfo.memorySize()); + writeFileInfo(fileInfo, memorySlice); + return memorySlice.toSlice(); + } + + public static void writeFileInfo(FileInfo fileInfo, MemorySliceOutput sliceOutput) { + // 1. write first key + if (fileInfo.firstKey != null) { + sliceOutput.writeInt(fileInfo.firstKey.length); + sliceOutput.writeBytes(fileInfo.firstKey); + } else { + sliceOutput.writeInt(0); + } + // 2. write other statistics + sliceOutput.writeInt(fileInfo.avgKeyLength); + sliceOutput.writeInt(fileInfo.avgValueLength); + sliceOutput.writeInt(fileInfo.maxKeyLength); + sliceOutput.writeInt(fileInfo.maxValueLength); + } + + @Override + public String toString() { + return "FileInfo{" + + "firstKey=" + + Arrays.toString(firstKey) + + ", avgKeyLength=" + + avgKeyLength + + ", avgValueLength=" + + avgValueLength + + ", maxKeyLength=" + + maxKeyLength + + ", maxValueLength=" + + maxValueLength + + '}'; + } + + /** Builder for file info. */ + public static class Builder { + private byte[] firstKey = null; + private long avgKeyLength = 0; + private long avgValueLength = 0; + private int maxKeyLength = -1; + private int maxValueLength = -1; + private int rowCount = 0; + + public Builder() {} + + public void update(byte[] key, byte[] value) { + if (firstKey == null) { + firstKey = Arrays.copyOf(key, key.length); + } + avgValueLength += value.length; + avgKeyLength += key.length; + maxKeyLength = Math.max(maxKeyLength, key.length); + maxValueLength = Math.max(maxValueLength, value.length); + rowCount++; + } + + public FileInfo build() { + return new FileInfo( + firstKey, + rowCount == 0 ? 0 : (int) (avgKeyLength / rowCount), + rowCount == 0 ? 0 : (int) (avgValueLength / rowCount), + maxKeyLength, + maxValueLength); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java new file mode 100644 index 000000000000..58bf28129c56 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/Footer.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.compression.BlockCompressionType; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.memory.MemorySliceInput; +import org.apache.paimon.memory.MemorySliceOutput; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A fixed-length Footer for an SST file. The layout is as below: + * + *
+ *     +---------------------------------------------------------------+
+ *     |   FileInfo Offset : 8 bytes    |    FileInfo size : 4 bytes   |
+ *     +---------------------------------------------------------------+
+ *     | BF offset : 8 bytes | BF size : 4 bytes | BF entries : 8 bytes|
+ *     +---------------------------------------------------------------+
+ *     |  IndexBlock Offset : 8 bytes   |   IndexBlock size : 4 bytes  |
+ *     +---------------------------------------------------------------+
+ *     |   MetaBlock Offset : 8 bytes   |    MetaBlock size : 4 bytes  |
+ *     +---------------------------------------------------------------+
+ *     | RowCnt : 4 bytes | DataSize : 8 bytes | CompressType : 1 byte |
+ *     +---------------------------------------------------------------+
+ *     |       Version : 4 bytes        |     Magic Number : 4 bytes   |
+ *     +---------------------------------------------------------------+
+ * 
+ */ +public class Footer { + + public static final int ENCODED_LENGTH = 77; + private final BlockHandle indexBlockHandle; + private final BlockHandle fileInfoHandle; + private final int version; + private final long totalUncompressedBytes; + private final int rowCount; + private final BlockCompressionType compressionType; + @Nullable private final BloomFilterHandle bloomFilterHandle; + @Nullable private final BlockHandle metaBlockHandle; + + Footer( + int version, + long totalUncompressedBytes, + int rowCount, + BlockCompressionType compressionType, + BlockHandle indexBlockHandle, + BlockHandle fileInfoHandle, + @Nullable BloomFilterHandle bloomFilterHandle, + @Nullable BlockHandle metaBlockHandle) { + this.bloomFilterHandle = bloomFilterHandle; + this.indexBlockHandle = indexBlockHandle; + this.metaBlockHandle = metaBlockHandle; + this.fileInfoHandle = fileInfoHandle; + this.version = version; + this.totalUncompressedBytes = totalUncompressedBytes; + this.rowCount = rowCount; + this.compressionType = compressionType; + } + + @Nullable + public BloomFilterHandle getBloomFilterHandle() { + return bloomFilterHandle; + } + + public BlockHandle getIndexBlockHandle() { + return indexBlockHandle; + } + + public int getVersion() { + return version; + } + + public BlockHandle getFileInfoHandle() { + return fileInfoHandle; + } + + @Nullable + public BlockHandle getMetaBlockHandle() { + return metaBlockHandle; + } + + public int getRowCount() { + return rowCount; + } + + public long getTotalUncompressedBytes() { + return totalUncompressedBytes; + } + + public BlockCompressionType getCompressionType() { + return compressionType; + } + + public static Footer readFooter(MemorySliceInput sliceInput) throws IOException { + // 1. read file info + BlockHandle fileInfoHandle = new BlockHandle(sliceInput.readLong(), sliceInput.readInt()); + + // 2. read optional bloom filter handle + @Nullable + BloomFilterHandle bloomFilterHandle = + new BloomFilterHandle( + sliceInput.readLong(), sliceInput.readInt(), sliceInput.readLong()); + if (bloomFilterHandle.offset() == 0 + && bloomFilterHandle.size() == 0 + && bloomFilterHandle.expectedEntries() == 0) { + bloomFilterHandle = null; + } + + // 3. read index block handle + BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(), sliceInput.readInt()); + + // 4. read optional metadata handle + BlockHandle metaBlockHandle = new BlockHandle(sliceInput.readLong(), sliceInput.readInt()); + if (metaBlockHandle.offset() == 0 && metaBlockHandle.size() == 0) { + metaBlockHandle = null; + } + + // skip padding + sliceInput.setPosition(ENCODED_LENGTH - 21); + + int rowCount = sliceInput.readInt(); + long totalUncompressedBytes = sliceInput.readLong(); + BlockCompressionType compressionType = + BlockCompressionType.getCompressionTypeByPersistentId( + sliceInput.readUnsignedByte()); + int version = sliceInput.readInt(); + + // verify magic number + int magicNumber = sliceInput.readInt(); + checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad magic number)"); + + return new Footer( + version, + totalUncompressedBytes, + rowCount, + compressionType, + indexBlockHandle, + fileInfoHandle, + bloomFilterHandle, + metaBlockHandle); + } + + public static MemorySlice writeFooter(Footer footer) { + MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH); + writeFooter(footer, output); + return output.toSlice(); + } + + public static void writeFooter(Footer footer, MemorySliceOutput sliceOutput) { + // 1. write file info + sliceOutput.writeLong(footer.fileInfoHandle.offset()); + sliceOutput.writeInt(footer.fileInfoHandle.size()); + + // 2. write optional bloom filter handle + if (footer.bloomFilterHandle == null) { + sliceOutput.writeLong(0); + sliceOutput.writeInt(0); + sliceOutput.writeLong(0); + } else { + sliceOutput.writeLong(footer.bloomFilterHandle.offset()); + sliceOutput.writeInt(footer.bloomFilterHandle.size()); + sliceOutput.writeLong(footer.bloomFilterHandle.expectedEntries()); + } + + // 3. write index block + sliceOutput.writeLong(footer.indexBlockHandle.offset()); + sliceOutput.writeInt(footer.indexBlockHandle.size()); + + // 4. write optional metadata handle + if (footer.metaBlockHandle == null) { + sliceOutput.writeLong(0); + sliceOutput.writeInt(0); + } else { + sliceOutput.writeLong(footer.metaBlockHandle.offset()); + sliceOutput.writeInt(footer.metaBlockHandle.size()); + } + + // 5. write fixed-size statistics + sliceOutput.writeInt(footer.rowCount); + sliceOutput.writeLong(footer.totalUncompressedBytes); + sliceOutput.writeByte(footer.compressionType.persistentId()); + + // 6. write version and magic number + sliceOutput.writeInt(footer.version); + sliceOutput.writeInt(MAGIC_NUMBER); + } + + @Override + public String toString() { + return "Footer{" + + "indexBlockHandle=" + + indexBlockHandle + + ", fileInfoHandle=" + + fileInfoHandle + + ", version=" + + version + + ", totalUncompressedBytes=" + + totalUncompressedBytes + + ", rowCount=" + + rowCount + + ", compressionType=" + + compressionType + + ", bloomFilterHandle=" + + bloomFilterHandle + + ", metaBlockHandle=" + + metaBlockHandle + + '}'; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/IndexEntryHandle.java b/paimon-common/src/main/java/org/apache/paimon/sst/IndexEntryHandle.java new file mode 100644 index 000000000000..f34bf88a58c2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/IndexEntryHandle.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.memory.MemorySliceInput; +import org.apache.paimon.memory.MemorySliceOutput; + +/** + * Handle of each index entry. The entries of an IndexBlock is a set of special sorted key-value + * pairs. Its layout is as below: + * + *
+ *     +--------------------------------------------------------+
+ *     |  Key  |          Last key of the Data Block            |
+ *     +--------------------------------------------------------+
+ *     | Value | Block Offset | Block Size |  Last Record Pos   |
+ *     +--------------------------------------------------------+
+ * 
+ */ +public class IndexEntryHandle { + private static final int MAX_ENCODED_LENGTH = 9 + 5 + 5; + private final long offset; + private final int size; + + /** The last record position of this index entry. */ + private final int lastRecordPosition; + + public IndexEntryHandle(long offset, int size, int lastRecordPosition) { + this.offset = offset; + this.size = size; + this.lastRecordPosition = lastRecordPosition; + } + + public int getLastRecordPosition() { + return lastRecordPosition; + } + + public int getSize() { + return size; + } + + public long getOffset() { + return offset; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexEntryHandle other = (IndexEntryHandle) o; + return offset == other.offset + && size == other.size + && lastRecordPosition == other.lastRecordPosition; + } + + public static IndexEntryHandle read(MemorySliceInput sliceInput) { + return new IndexEntryHandle( + sliceInput.readVarLenLong(), + sliceInput.readVarLenInt(), + sliceInput.readVarLenInt()); + } + + public static MemorySlice write(IndexEntryHandle indexEntryHandle) { + MemorySliceOutput sliceOutput = new MemorySliceOutput(MAX_ENCODED_LENGTH); + write(indexEntryHandle, sliceOutput); + return sliceOutput.toSlice(); + } + + private static void write(IndexEntryHandle indexEntryHandle, MemorySliceOutput sliceOutput) { + sliceOutput.writeVarLenLong(indexEntryHandle.offset); + sliceOutput.writeVarLenInt(indexEntryHandle.size); + sliceOutput.writeVarLenInt(indexEntryHandle.lastRecordPosition); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileLookupReader.java b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileLookupReader.java new file mode 100644 index 000000000000..d3dd24eededc --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileLookupReader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.utils.MurmurHashUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Comparator; + +/** An SST File Reader to serve lookup queries. */ +public class SstFileLookupReader extends AbstractSstFileReader { + public SstFileLookupReader( + SeekableInputStream input, + Comparator comparator, + long fileSize, + Path filePath, + @Nullable BlockCache blockCache) + throws IOException { + super(input, comparator, fileSize, filePath, blockCache); + } + + /** + * Lookup the specified key in the file. + * + * @param key serialized key + * @return corresponding serialized value, null if not found. + */ + public byte[] lookup(byte[] key) throws IOException { + if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) { + return null; + } + + MemorySlice keySlice = MemorySlice.wrap(key); + if (firstKey == null || comparator.compare(firstKey, keySlice) > 0) { + return null; + } + // seek the index to the block containing the key + indexBlockIterator.seekTo(keySlice); + + // if indexIterator does not have a next, it means the key does not exist in this iterator + if (indexBlockIterator.hasNext()) { + // seek the current iterator to the key + BlockIterator current = getNextBlock(indexBlockIterator); + if (current.seekTo(keySlice)) { + return current.next().getValue().copyBytes(); + } + } + return null; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileScanReader.java b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileScanReader.java new file mode 100644 index 000000000000..2647fcf0a022 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileScanReader.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.utils.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Comparator; + +/** An SST File Reader to serve scan queries. */ +public class SstFileScanReader extends AbstractSstFileReader { + private BlockIterator currentDataIterator; + + public SstFileScanReader( + SeekableInputStream input, + Comparator comparator, + long fileSize, + Path filePath, + @Nullable BlockCache blockCache) + throws IOException { + super(input, comparator, fileSize, filePath, blockCache); + this.currentDataIterator = null; + } + + /** + * Seek to the position of the record whose key is equal to the key or is the smallest element + * greater than the given key. + * + * @param key the key to seek + * @return record position of the seeked record, -1 if not found. + */ + public int seekTo(byte[] key) throws IOException { + if (footer.getRowCount() == 0) { + return -1; + } + MemorySlice keySlice = MemorySlice.wrap(key); + // find candidate index block + indexBlockIterator.seekTo(keySlice); + if (indexBlockIterator.hasNext()) { + // avoid fetching data block if the key is smaller than firstKey + if (comparator.compare(firstKey, keySlice) > 0) { + return 0; + } + IndexEntryHandle entryHandle = + IndexEntryHandle.read(indexBlockIterator.next().getValue().toInput()); + currentDataIterator = + readBlock(new BlockHandle(entryHandle.getOffset(), entryHandle.getSize())) + .iterator(); + currentDataIterator.seekTo(keySlice); + int recordCount = currentDataIterator.getRecordCount(); + int positionInBlock = currentDataIterator.getRecordPosition(); + Preconditions.checkState( + positionInBlock >= 0, "Position inside data block should >= 0, it's a bug."); + return entryHandle.getLastRecordPosition() - (recordCount - positionInBlock - 1); + } + return -1; + } + + /** + * Seek to the specified record position. + * + * @param position position + */ + public void seekTo(int position) throws IOException { + if (position < 0 || position >= footer.getRowCount()) { + throw new IndexOutOfBoundsException( + String.format( + "Index out of range, file %s only have %s rows, but trying to seek to %s", + filePath, footer.getRowCount(), position)); + } + + // 1. seekTo the block entry exactly containing the position + indexBlockIterator.seekTo( + position, + Comparator.naturalOrder(), + entry -> IndexEntryHandle.read(entry.getValue().toInput()).getLastRecordPosition()); + + // 2. fetch the data block + Preconditions.checkState(indexBlockIterator.hasNext()); + IndexEntryHandle entryHandle = + IndexEntryHandle.read(indexBlockIterator.next().getValue().toInput()); + currentDataIterator = + readBlock(new BlockHandle(entryHandle.getOffset(), entryHandle.getSize())) + .iterator(); + + // 3. seek to the inner position of data block + int positionInBlock = + position + - (entryHandle.getLastRecordPosition() + - currentDataIterator.getRecordCount() + + 1); + currentDataIterator.seekTo(positionInBlock); + } + + /** + * Read a batch of records from current position. + * + * @return a batch of entries, null if reaching file end + */ + public BlockIterator readBatch() throws IOException { + BlockIterator result = null; + if (currentDataIterator == null || !currentDataIterator.hasNext()) { + // reach file end + if (!indexBlockIterator.hasNext()) { + return null; + } + currentDataIterator = getNextBlock(indexBlockIterator); + } + result = currentDataIterator; + currentDataIterator = null; + return result; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java similarity index 76% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java rename to paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java index 8339ac01f894..903d5e89eb42 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileUtils.java @@ -16,27 +16,24 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.sst; -import org.apache.paimon.compression.BlockCompressionType; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySlice; import java.util.zip.CRC32; /** Utils for sort lookup store. */ -public class SortLookupStoreUtils { - public static int crc32c(MemorySlice data, BlockCompressionType type) { +public class SstFileUtils { + public static int crc32c(MemorySlice data) { CRC32 crc = new CRC32(); crc.update(data.getHeapMemory(), data.offset(), data.length()); - crc.update(type.persistentId() & 0xFF); return (int) crc.getValue(); } - public static int crc32c(MemorySegment data, BlockCompressionType type) { + public static int crc32c(MemorySegment data) { CRC32 crc = new CRC32(); crc.update(data.getHeapMemory(), 0, data.size()); - crc.update(type.persistentId() & 0xFF); return (int) crc.getValue(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java new file mode 100644 index 000000000000..dcae27418b4e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.BlockCompressionType; +import org.apache.paimon.compression.BlockCompressor; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.MurmurHashUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; + +import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes; +import static org.apache.paimon.sst.SstFileUtils.crc32c; +import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt; + +/** + * A {@link FileFormat} for SST Files. SST Files are row-oriented and designed to serve frequent + * point queries and range queries by key. The SST File layout is as below: (For layouts of each + * block type, please refer to corresponding classes) + * + *
+ *     +-----------------------------------+------+
+ *     |             Footer                |      |
+ *     +-----------------------------------+      |
+ *     |            File Info              |      |
+ *     +-----------------------------------+      +--> Loaded on open
+ *     |        Bloom Filter Block         |      |
+ *     +-----------------------------------+      |
+ *     |           Index Block             |      |
+ *     +-----------------------------------+------+
+ *     |          Metadata Block           |      |
+ *     +-----------------------------------+      |
+ *     |            Data Block             |      |
+ *     +-----------------------------------+      +--> Loaded on need
+ *                    ......                      |
+ *     +-----------------------------------+      |
+ *     |            Data Block             |      |
+ *     +-----------------------------------+------+
+ * 
+ */ +public class SstFileWriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(SstFileWriter.class.getName()); + + public static final int MAGIC_NUMBER = 1481571681; + public static final int VERSION = 1; + + private final PositionOutputStream out; + private final int blockSize; + private final BlockWriter dataBlockWriter; + private final BlockWriter indexBlockWriter; + private final BlockWriter metaBlockWriter; + @Nullable private final BloomFilter.Builder bloomFilter; + private final BlockCompressionType compressionType; + @Nullable private final BlockCompressor blockCompressor; + private final FileInfo.Builder fileInfoBuilder; + + private byte[] lastKey; + private long position; + + private long recordCount; + private long totalUncompressedSize; + private long totalCompressedSize; + + public SstFileWriter( + PositionOutputStream out, + int blockSize, + @Nullable BloomFilter.Builder bloomFilter, + @Nullable BlockCompressionFactory compressionFactory) { + this.out = out; + this.blockSize = blockSize; + this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1)); + int expectedNumberOfBlocks = 1024; + this.indexBlockWriter = + new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH * expectedNumberOfBlocks); + this.metaBlockWriter = new BlockWriter(blockSize); + this.bloomFilter = bloomFilter; + if (compressionFactory == null) { + this.compressionType = BlockCompressionType.NONE; + this.blockCompressor = null; + } else { + this.compressionType = compressionFactory.getCompressionType(); + this.blockCompressor = compressionFactory.getCompressor(); + } + this.fileInfoBuilder = new FileInfo.Builder(); + } + + public void put(byte[] key, byte[] value) throws IOException { + dataBlockWriter.add(key, value); + recordCount++; + + if (bloomFilter != null) { + bloomFilter.addHash(MurmurHashUtils.hashBytes(key)); + } + + lastKey = key; + + if (dataBlockWriter.memory() > blockSize) { + flush(); + } + + fileInfoBuilder.update(key, value); + } + + public void putMetaData(byte[] metaKey, byte[] metaValue) throws IOException { + metaBlockWriter.add(metaKey, metaValue); + } + + private void flush() throws IOException { + if (dataBlockWriter.size() == 0) { + return; + } + + int lastRecordPosition = (int) (recordCount - 1); + BlockHandle blockHandle = writeBlock(dataBlockWriter); + MemorySlice handleEncoding = + IndexEntryHandle.write( + new IndexEntryHandle( + blockHandle.offset(), blockHandle.size(), lastRecordPosition)); + indexBlockWriter.add(lastKey, handleEncoding.copyBytes()); + } + + private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException { + // close the block + MemorySlice block = blockWriter.finish(); + + totalUncompressedSize += block.length(); + + // attempt to compress the block + if (blockCompressor != null) { + int maxCompressedSize = blockCompressor.getMaxCompressedSize(block.length()); + byte[] compressed = allocateReuseBytes(maxCompressedSize + 5); + int offset = encodeInt(compressed, 0, block.length()); + int compressedSize = + offset + + blockCompressor.compress( + block.getHeapMemory(), + block.offset(), + block.length(), + compressed, + offset); + + block = new MemorySlice(MemorySegment.wrap(compressed), 0, compressedSize); + } + + totalCompressedSize += block.length(); + + // create block trailer + BlockTrailer blockTrailer = new BlockTrailer(crc32c(block)); + MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer); + + // create a handle to this block + BlockHandle blockHandle = new BlockHandle(position, block.length()); + + // write data + writeSlice(block); + + // write trailer: 4 bytes + writeSlice(trailer); + + // clean up state + blockWriter.reset(); + + return blockHandle; + } + + @Override + public void close() throws IOException { + // flush current data block + flush(); + + LOG.info("Number of record: {}", recordCount); + + // 1. write meta block if present + @Nullable BlockHandle metaBlockHandle = null; + if (metaBlockWriter.size() > 0) { + metaBlockHandle = writeBlock(metaBlockWriter); + } + + // 2. write index block + BlockHandle indexBlockHandle = writeBlock(indexBlockWriter); + + // 3. write bloom filter + @Nullable BloomFilterHandle bloomFilterHandle = null; + if (bloomFilter != null) { + MemorySegment buffer = bloomFilter.getBuffer(); + bloomFilterHandle = + new BloomFilterHandle(position, buffer.size(), bloomFilter.expectedEntries()); + writeSlice(MemorySlice.wrap(buffer)); + LOG.info("Bloom filter size: {} bytes", bloomFilter.getBuffer().size()); + } + + // 4. write file info block + FileInfo fileInfo = fileInfoBuilder.build(); + MemorySlice fileInfoEncoding = FileInfo.writeFileInfo(fileInfo); + BlockHandle fileInfoHandle = new BlockHandle(position, fileInfoEncoding.length()); + writeSlice(fileInfoEncoding); + + // 5. write footer + Footer footer = + new Footer( + VERSION, + totalUncompressedSize, + (int) recordCount, + compressionType, + indexBlockHandle, + fileInfoHandle, + bloomFilterHandle, + metaBlockHandle); + MemorySlice footerEncoding = Footer.writeFooter(footer); + writeSlice(footerEncoding); + + LOG.info("totalUncompressedSize: {}", MemorySize.ofBytes(totalUncompressedSize)); + LOG.info("totalCompressedSize: {}", MemorySize.ofBytes(totalCompressedSize)); + } + + private void writeSlice(MemorySlice slice) throws IOException { + out.write(slice.getHeapMemory(), slice.offset(), slice.length()); + position += slice.length(); + } + + public long getTotalCompressedSize() { + return totalUncompressedSize; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java new file mode 100644 index 000000000000..9d6da465085b --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/sst/SstFileTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sst; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.memory.MemorySliceOutput; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; +import org.apache.paimon.utils.BloomFilter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** Test for {@link AbstractSstFileReader} and {@link SstFileWriter}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class SstFileTest { + private static final Logger LOG = LoggerFactory.getLogger(SstFileTest.class); + + // 256 records per block + private static final int BLOCK_SIZE = (10) * 256; + private static final CacheManager CACHE_MANAGER = new CacheManager(MemorySize.ofMebiBytes(10)); + @TempDir java.nio.file.Path tempPath; + + private final boolean bloomFilterEnabled; + private final CompressOptions compress; + + private FileIO fileIO; + private Path file; + private Path parent; + + public SstFileTest(List var) { + this.bloomFilterEnabled = (Boolean) var.get(0); + this.compress = new CompressOptions((String) var.get(1), 1); + } + + @SuppressWarnings("unused") + @Parameters(name = "enableBf&compress-{0}") + public static List> getVarSeg() { + return Arrays.asList( + Arrays.asList(true, "none"), + Arrays.asList(false, "none"), + Arrays.asList(false, "lz4"), + Arrays.asList(true, "lz4"), + Arrays.asList(false, "zstd"), + Arrays.asList(true, "zstd")); + } + + @BeforeEach + public void beforeEach() { + this.fileIO = LocalFileIO.create(); + this.parent = new Path(tempPath.toUri()); + this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + } + + private void writeData(int recordCount, boolean withBloomFilter) throws Exception { + BloomFilter.Builder bloomFilter = null; + if (withBloomFilter) { + bloomFilter = BloomFilter.builder(recordCount, 0.05); + } + BlockCompressionFactory compressionFactory = BlockCompressionFactory.create(compress); + try (PositionOutputStream outputStream = fileIO.newOutputStream(file, true); + SstFileWriter writer = + new SstFileWriter( + outputStream, BLOCK_SIZE, bloomFilter, compressionFactory); ) { + MemorySliceOutput keyOut = new MemorySliceOutput(4); + MemorySliceOutput valueOut = new MemorySliceOutput(4); + long start = System.currentTimeMillis(); + for (int i = 0; i < recordCount; i++) { + keyOut.reset(); + valueOut.reset(); + keyOut.writeInt(i); + valueOut.writeInt(i); + writer.put(keyOut.toSlice().getHeapMemory(), valueOut.toSlice().getHeapMemory()); + } + LOG.info("Write {} data cost {} ms", recordCount, System.currentTimeMillis() - start); + } + } + + @TestTemplate + public void testLookup() throws Exception { + writeData(5000, bloomFilterEnabled); + innerTestLookup(); + } + + private void innerTestLookup() throws Exception { + long fileSize = fileIO.getFileSize(file); + try (SeekableInputStream inputStream = fileIO.newInputStream(file); + BlockCache blockCache = new BlockCache(file, inputStream, CACHE_MANAGER); + SstFileLookupReader reader = + new SstFileLookupReader( + inputStream, + Comparator.comparingInt(slice -> slice.readInt(0)), + fileSize, + file, + blockCache); ) { + Random random = new Random(); + MemorySliceOutput keyOut = new MemorySliceOutput(4); + + // 1. lookup random existing keys + for (int i = 0; i < 100; i++) { + int key = random.nextInt(5000); + keyOut.reset(); + keyOut.writeInt(key); + byte[] queried = reader.lookup(keyOut.toSlice().getHeapMemory()); + Assertions.assertNotNull(queried); + Assertions.assertEquals(key, MemorySlice.wrap(queried).readInt(0)); + } + + // 2. lookup boundaries + keyOut.reset(); + keyOut.writeInt(0); + byte[] queried = reader.lookup(keyOut.toSlice().getHeapMemory()); + Assertions.assertNotNull(queried); + Assertions.assertEquals(0, MemorySlice.wrap(queried).readInt(0)); + + keyOut.reset(); + keyOut.writeInt(511); + byte[] queried1 = reader.lookup(keyOut.toSlice().getHeapMemory()); + Assertions.assertNotNull(queried1); + Assertions.assertEquals(511, MemorySlice.wrap(queried1).readInt(0)); + + keyOut.reset(); + keyOut.writeInt(4999); + byte[] queried2 = reader.lookup(keyOut.toSlice().getHeapMemory()); + Assertions.assertNotNull(queried2); + Assertions.assertEquals(4999, MemorySlice.wrap(queried2).readInt(0)); + + // 2. lookup key smaller than first key + for (int i = 0; i < 100; i++) { + keyOut.reset(); + keyOut.writeInt(-10 - i); + Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory())); + } + + // 3. lookup key greater than last key + for (int i = 0; i < 100; i++) { + keyOut.reset(); + keyOut.writeInt(10000 + i); + Assertions.assertNull(reader.lookup(keyOut.toSlice().getHeapMemory())); + } + } + } + + @TestTemplate + public void testFullScan() throws Exception { + writeData(5000, false); + + long fileSize = fileIO.getFileSize(file); + try (SeekableInputStream inputStream = fileIO.newInputStream(file); + BlockCache blockCache = new BlockCache(file, inputStream, CACHE_MANAGER); + SstFileScanReader reader = + new SstFileScanReader( + inputStream, + Comparator.comparingInt(slice -> slice.readInt(0)), + fileSize, + file, + blockCache); ) { + assertScan(0, reader); + } + } + + @TestTemplate + public void testSeekAndScan() throws Exception { + writeData(5000, false); + + long fileSize = fileIO.getFileSize(file); + try (SeekableInputStream inputStream = fileIO.newInputStream(file); + BlockCache blockCache = new BlockCache(file, inputStream, CACHE_MANAGER); + SstFileScanReader reader = + new SstFileScanReader( + inputStream, + Comparator.comparingInt(slice -> slice.readInt(0)), + fileSize, + file, + blockCache); ) { + MemorySliceOutput keyOut = new MemorySliceOutput(4); + int startPos; + + // 1. seek to each data block + for (int start = BLOCK_SIZE / 10 / 2; start < 5000; start += BLOCK_SIZE / 10) { + keyOut.reset(); + keyOut.writeInt(start); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(start, startPos); + assertScan(startPos, reader); + } + + // 2. seek to boundaries should behave well + keyOut.reset(); + keyOut.writeInt(0); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(0, startPos); + assertScan(startPos, reader); + + keyOut.reset(); + keyOut.writeInt(BLOCK_SIZE / 10 - 1); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(BLOCK_SIZE / 10 - 1, startPos); + assertScan(startPos, reader); + + keyOut.reset(); + keyOut.writeInt(4999); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(4999, startPos); + assertScan(startPos, reader); + + // 3. seek will reset the iterator + keyOut.reset(); + keyOut.writeInt(2000); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(2000, startPos); + assertScan(startPos, reader); + + // 4. seeking to some key smaller than the first key is equal to full scan + keyOut.reset(); + keyOut.writeInt(-8); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(0, startPos); + assertScan(startPos, reader); + + // 5. seeking to some key greater than the last key will return null immediately + keyOut.reset(); + keyOut.writeInt(6000); + startPos = reader.seekTo(keyOut.toSlice().getHeapMemory()); + Assertions.assertEquals(-1, startPos); + Assertions.assertNull(reader.readBatch()); + } + } + + @TestTemplate + public void testSeekToPositionAndScan() throws Exception { + writeData(5000, false); + + long fileSize = fileIO.getFileSize(file); + try (SeekableInputStream inputStream = fileIO.newInputStream(file); + BlockCache blockCache = new BlockCache(file, inputStream, CACHE_MANAGER); + SstFileScanReader reader = + new SstFileScanReader( + inputStream, + Comparator.comparingInt(slice -> slice.readInt(0)), + fileSize, + file, + blockCache); ) { + + // 1. seek to each block + for (int start = BLOCK_SIZE / 10 / 2; start < 5000; start += BLOCK_SIZE / 10) { + reader.seekTo(start); + assertScan(start, reader); + } + + // 2. seek to boundaries + // zero + reader.seekTo(0); + assertScan(0, reader); + // last position + reader.seekTo(4999); + assertScan(4999, reader); + // first position of an inner block + reader.seekTo(512); + assertScan(512, reader); + // last position of an inner block + reader.seekTo(767); + assertScan(767, reader); + + // 3. seek out of boundary + Assertions.assertThrows(IndexOutOfBoundsException.class, () -> reader.seekTo(-1)); + Assertions.assertThrows(IndexOutOfBoundsException.class, () -> reader.seekTo(5000)); + } + } + + private static void assertScan(int startPosition, SstFileScanReader reader) throws Exception { + int count = startPosition; + Iterator iter; + while ((iter = reader.readBatch()) != null) { + while (iter.hasNext()) { + BlockEntry entry = iter.next(); + Assertions.assertEquals(count, entry.getKey().readInt(0)); + Assertions.assertEquals(count, entry.getValue().readInt(0)); + count++; + } + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormat.java new file mode 100644 index 000000000000..fe2cfdbdd22b --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormat.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.sst; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.RoaringBitmap32; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link FileFormat} for SST Files. Please refer to {@link org.apache.paimon.sst.SstFileWriter} + * for more information. + */ +public class SstFileFormat extends FileFormat { + private final Options options; + private final MemorySize writeBatchMemory; + + public SstFileFormat(FormatContext context) { + super(SstFileFormatFactory.IDENTIFIER); + this.options = context.options(); + this.writeBatchMemory = context.writeBatchMemory(); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters) { + throw new RuntimeException( + "SST files are row-oriented kv store, please specify key type and value type on creating factories."); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + throw new RuntimeException( + "SST files are row-oriented kv store, please specify key type and value type on creating factories."); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters, + RowType keyType, + RowType valueType) { + return new SstFileFormatReaderFactory(projectedRowType, keyType, valueType); + } + + @Override + public FormatWriterFactory createWriterFactory( + RowType rowType, RowType keyType, RowType valueType) { + return new SstFileFormatWriterFactory( + options, rowType, keyType, valueType, writeBatchMemory); + } + + @Override + public void validateDataFields(RowType rowType) { + List fieldTypes = rowType.getFieldTypes(); + for (DataType dataType : fieldTypes) { + validateDataType(dataType); + } + } + + private void validateDataType(DataType dataType) { + // SST Files will serialize values into bytes, so that actually all data types are + // supported. + // todo: check key types are comparable + DataTypeRoot typeRoot = dataType.getTypeRoot(); + switch (typeRoot) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case BINARY: + case VARBINARY: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case ARRAY: + case MAP: + case ROW: + // All types are supported in SST Files + break; + default: + throw new UnsupportedOperationException( + "Unsupported data type for SST format: " + dataType); + } + } + + /** The {@link FormatReaderFactory} for SST Files. */ + private static class SstFileFormatReaderFactory implements FormatReaderFactory { + private final RowType projectedRowType; + private final RowType keyType; + private final RowType valueType; + + public SstFileFormatReaderFactory( + RowType projectedRowType, RowType keyType, RowType valueType) { + this.projectedRowType = projectedRowType; + this.keyType = keyType; + this.valueType = valueType; + } + + @Override + public FileRecordReader createReader(Context context) throws IOException { + return new SstFormatReader( + context.fileIO(), + context.filePath(), + context.fileSize(), + convertSelection(context.selection()), + projectedRowType, + keyType, + valueType); + } + + private List convertSelection(RoaringBitmap32 selection) { + if (selection == null) { + return null; + } + List result = new ArrayList<>(); + selection.iterator().forEachRemaining(result::add); + return result; + } + } + + /** The {@link FormatWriterFactory} for SST Files. */ + private static class SstFileFormatWriterFactory implements FormatWriterFactory { + private final Options options; + private final RowType dataType; + private final RowType keyType; + private final RowType valueType; + private final MemorySize writeBatchMemory; + + public SstFileFormatWriterFactory( + Options options, + RowType dataType, + RowType keyType, + RowType valueType, + MemorySize writeBatchMemory) { + this.options = options; + this.keyType = keyType; + this.valueType = valueType; + this.dataType = dataType; + this.writeBatchMemory = writeBatchMemory; + } + + @Override + public FormatWriter create(PositionOutputStream out, String compression) + throws IOException { + BloomFilter.Builder bloomFilter = null; + boolean enableBloomFilter = options.get(SstOptions.BLOOM_FILTER_ENABLED); + if (enableBloomFilter) { + double fpp = options.get(SstOptions.BLOOM_FILTER_FPP); + int estimatedEntryNum = options.get(SstOptions.BLOOM_FILTER_EXPECTED_ENTRY_NUM); + bloomFilter = BloomFilter.builder(estimatedEntryNum, fpp); + } + + return new SstFormatWriter( + out, + compression, + writeBatchMemory.getBytes(), + bloomFilter, + dataType, + keyType, + valueType); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormatFactory.java similarity index 62% rename from paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java rename to paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormatFactory.java index 5aacb56bb769..81c22b4e44f8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFileFormatFactory.java @@ -16,20 +16,22 @@ * limitations under the License. */ -package org.apache.paimon.lookup.sort; +package org.apache.paimon.format.sst; -import org.apache.paimon.lookup.LookupStoreFactory.Context; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; -/** A {@link Context} for sort store. */ -public class SortContext implements Context { +/** The {@link FileFormatFactory} for SST Files. */ +public class SstFileFormatFactory implements FileFormatFactory { + public static final String IDENTIFIER = "sst"; - private final long fileSize; - - public SortContext(long fileSize) { - this.fileSize = fileSize; + @Override + public String identifier() { + return IDENTIFIER; } - public long fileSize() { - return fileSize; + @Override + public FileFormat create(FormatContext formatContext) { + return new SstFileFormat(formatContext); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatReader.java new file mode 100644 index 000000000000..ae6bef118769 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatReader.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.sst; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.reader.DataEvolutionRow; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.sst.BlockEntry; +import org.apache.paimon.sst.BlockIterator; +import org.apache.paimon.sst.SstFileScanReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** The FormatReader for SST Files. */ +public class SstFormatReader implements FileRecordReader { + private final RowCompactedSerializer keySerializer; + private final RowCompactedSerializer valueSerializer; + + /** + * Use {@link DataEvolutionRow} to combine key row and value row as well as doing projection. + */ + private final DataEvolutionRow resultRow; + + private final SstFileScanReader fileReader; + private final SeekableInputStream inputStream; + private final Path filePath; + @Nullable private final List selection; + int nextSelection = 0; + int returnedPosition = -1; + + public SstFormatReader( + FileIO fileIO, + Path filePath, + long fileSize, + @Nullable List selection, + RowType projectedRowType, + RowType keyType, + RowType valueType) + throws IOException { + this.keySerializer = new RowCompactedSerializer(keyType); + this.valueSerializer = new RowCompactedSerializer(valueType); + this.resultRow = createResultRow(projectedRowType, keyType, valueType); + this.inputStream = fileIO.newInputStream(filePath); + this.filePath = filePath; + // we could set block cache in the future if we want to pre-fetch some data blocks for scan + this.fileReader = + new SstFileScanReader( + inputStream, + keySerializer.createSliceComparator(), + fileSize, + filePath, + null); + this.selection = selection; + } + + private DataEvolutionRow createResultRow(RowType dataType, RowType keyType, RowType valueType) { + final int keyRowInd = 0, valueRowInd = 1; + int[] rowOffsets = new int[dataType.getFieldCount()]; + int[] fieldOffsets = new int[dataType.getFieldCount()]; + + List dataFields = dataType.getFields(); + for (int i = 0; i < dataFields.size(); i++) { + String fieldName = dataFields.get(i).name(); + int fieldIndex = keyType.getFieldIndex(fieldName); + if (fieldIndex >= 0) { + rowOffsets[i] = keyRowInd; + } else { + fieldIndex = valueType.getFieldIndex(fieldName); + if (fieldIndex < 0) { + throw new RuntimeException( + String.format( + "Field %s is not found in key type nor value type.", + fieldName)); + } + rowOffsets[i] = valueRowInd; + } + fieldOffsets[i] = fieldIndex; + } + + return new DataEvolutionRow(2, rowOffsets, fieldOffsets); + } + + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + // if selection is not null, we need to seek to the next position + if (selection != null) { + if (selectionEnd()) { + return null; + } + fileReader.seekTo(getNextPosition()); + } + + BlockIterator blockIterator = fileReader.readBatch(); + + if (blockIterator == null) { + return null; + } + + final int blockStartPos = getNextPosition() - blockIterator.getRecordPosition(); + + return new FileRecordIterator() { + @Override + public long returnedPosition() { + return returnedPosition; + } + + @Override + public Path filePath() { + return filePath; + } + + @Nullable + @Override + public InternalRow next() { + if (!blockIterator.hasNext() || selectionEnd()) { + return null; + } + + int nextPosition = getNextPosition(); + if (selection != null) { + // if next position is in another data block, return null immediately + if (nextPosition >= blockStartPos + blockIterator.getRecordCount()) { + return null; + } + // else we need to seek to data block inner position + blockIterator.seekTo(nextPosition - blockStartPos); + nextSelection++; + } + returnedPosition = nextPosition; + + return readNext(); + } + + private InternalRow readNext() { + BlockEntry entry = blockIterator.next(); + resultRow.setRows( + new InternalRow[] { + keySerializer.deserialize(entry.getKey()), + valueSerializer.deserialize(entry.getValue()) + }); + return resultRow; + } + + @Override + public void releaseBatch() { + // currently nothing to do + } + }; + } + + @Override + public void close() throws IOException { + inputStream.close(); + fileReader.close(); + } + + private boolean selectionEnd() { + return selection != null && nextSelection >= selection.size(); + } + + private int getNextPosition() { + return selection == null ? returnedPosition + 1 : selection.get(nextSelection); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatWriter.java new file mode 100644 index 000000000000..89ca60263487 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstFormatWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.sst; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.BlockCompressionType; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.sst.SstFileWriter; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.ProjectedRow; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** The {@link FormatWriter} for SST file. */ +public class SstFormatWriter implements FormatWriter { + private final SstFileWriter fileWriter; + private final ProjectedRow reusedKey; + private final ProjectedRow reusedValue; + private final RowCompactedSerializer keySerializer; + private final RowCompactedSerializer valueSerializer; + + public SstFormatWriter( + PositionOutputStream out, + String compression, + long blockSize, + @Nullable BloomFilter.Builder bloomFilter, + RowType dataType, + RowType keyType, + RowType valueType) { + BlockCompressionFactory compressionFactory = + BlockCompressionFactory.create( + BlockCompressionType.getCompressionTypeByValue(compression)); + this.fileWriter = new SstFileWriter(out, (int) blockSize, bloomFilter, compressionFactory); + this.reusedKey = ProjectedRow.from(keyType, dataType); + this.reusedValue = ProjectedRow.from(valueType, dataType); + this.keySerializer = new RowCompactedSerializer(keyType); + this.valueSerializer = new RowCompactedSerializer(valueType); + } + + @Override + public void addElement(InternalRow element) throws IOException { + reusedKey.replaceRow(element); + reusedValue.replaceRow(element); + fileWriter.put( + keySerializer.serializeToBytes(reusedKey), + valueSerializer.serializeToBytes(reusedValue)); + } + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException { + return suggestedCheck && fileWriter.getTotalCompressedSize() >= targetSize; + } + + @Override + public void close() throws IOException { + fileWriter.close(); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/sst/SstOptions.java b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstOptions.java new file mode 100644 index 000000000000..3b88f052e73e --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/sst/SstOptions.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.sst; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; + +/** Options for SST Format. */ +public class SstOptions { + + public static final ConfigOption BLOOM_FILTER_ENABLED = + ConfigOptions.key("sst.bloom-filter.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable the bloom filter for SST Files."); + + public static final ConfigOption BLOOM_FILTER_FPP = + ConfigOptions.key("sst.bloom-filter.fpp") + .doubleType() + .defaultValue(0.05) + .withDescription( + "Define the default false positive probability for SST Files bloom filters."); + + public static final ConfigOption BLOOM_FILTER_EXPECTED_ENTRY_NUM = + ConfigOptions.key("sst.bloom-filter.expected-entry-num") + .intType() + .defaultValue(1000_000) + .withDescription( + "Defines the estimated entry num of bloom filter, user should set this" + + " value slightly greater than normal file record num."); +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/sst/SstFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/sst/SstFileFormatTest.java new file mode 100644 index 000000000000..ca552a3805cf --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/sst/SstFileFormatTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.sst; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.RoaringBitmap32; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** + * Test class for {@link SstFileFormat}. This test is temporarily disabled for the reason that it + * can't find classes in {@code org.apache.paimon.shaded.io.airlift} package. Currently, the airlift + * package is shaded and reallocated by paimon-format module, but during the test process, the + * airlift package has not been reallocated. So we can not find classes in {@code + * org.apache.paimon.shaded.io.airlift} + */ +@Disabled("Temporarily disabled due to ClassNotFoundException.") +public class SstFileFormatTest { + private static final Logger LOG = LoggerFactory.getLogger(SstFileFormatTest.class); + + private static final RowType ROW_TYPE = + RowType.of( + new DataField(0, "id", new IntType()), + new DataField(1, "name", new VarCharType()), + new DataField(2, "val1", new DoubleType()), + new DataField(3, "val2", new VarCharType())); + private static final RowType KEY_TYPE = + RowType.of( + new DataField(0, "id", new IntType()), + new DataField(1, "name", new VarCharType())); + + private static final RowType VALUE_TYPE = + RowType.of( + new DataField(2, "val1", new DoubleType()), + new DataField(3, "val2", new VarCharType())); + @TempDir java.nio.file.Path tempPath; + + protected FileIO fileIO; + protected Path file; + protected Path parent; + protected Options options; + + @BeforeEach + public void beforeEach() { + this.fileIO = LocalFileIO.create(); + this.parent = new Path(tempPath.toUri()); + this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + this.options = new Options(); + } + + @Test + public void testFullScan() throws Exception { + final int recordNum = 1000000; + writeData(recordNum); + FileFormat format = createFormat(); + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + + FormatReaderFactory readerFactory; + + // 1. no projection + readerFactory = format.createReaderFactory(ROW_TYPE, ROW_TYPE, null, KEY_TYPE, VALUE_TYPE); + try (FileRecordReader reader = readerFactory.createReader(context); ) { + assertScan(reader, null, ROW_TYPE, recordNum); + } + + RowType projection = + RowType.of( + new DataField(3, "val2", new VarCharType()), + new DataField(0, "id", new IntType())); + // 2. with projection + readerFactory = + format.createReaderFactory(ROW_TYPE, projection, null, KEY_TYPE, VALUE_TYPE); + try (FileRecordReader reader = readerFactory.createReader(context); ) { + assertScan(reader, null, projection, recordNum); + } + } + + @Test + public void testRandomAccess() throws Exception { + final int recordNum = 100000; + writeData(recordNum); + FileFormat format = createFormat(); + + FormatReaderFactory readerFactory; + + // 1. random selection without projection + readerFactory = format.createReaderFactory(ROW_TYPE, ROW_TYPE, null, KEY_TYPE, VALUE_TYPE); + for (int i = 0; i < 10; i++) { + testRandomSelection(readerFactory, recordNum, 100, ROW_TYPE); + } + + // 2. random selection with projection + RowType projection = + RowType.of( + new DataField(3, "val2", new VarCharType()), + new DataField(0, "id", new IntType())); + readerFactory = + format.createReaderFactory(ROW_TYPE, projection, null, KEY_TYPE, VALUE_TYPE); + for (int i = 0; i < 10; i++) { + testRandomSelection(readerFactory, recordNum, 100, projection); + } + } + + private void testRandomSelection( + FormatReaderFactory readerFactory, int recordNum, int selectionNum, RowType projection) + throws IOException { + RoaringBitmap32 selection = new RoaringBitmap32(); + Random random = new Random(System.currentTimeMillis()); + for (int i = 0; i < selectionNum; i++) { + selection.add(random.nextInt(recordNum)); + } + + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file), selection); + try (FileRecordReader reader = readerFactory.createReader(context)) { + assertScan(reader, selection, projection, (int) selection.getCardinality()); + } + } + + private void assertScan( + FileRecordReader reader, + RoaringBitmap32 selection, + RowType projection, + int num) + throws IOException { + long start = System.currentTimeMillis(); + ProjectedRow targetRow = ProjectedRow.from(projection, ROW_TYPE); + FileRecordIterator iter; + Iterator selectionIter = selection == null ? null : selection.iterator(); + + int cnt = 0; + while ((iter = reader.readBatch()) != null) { + InternalRow record; + while ((record = iter.next()) != null) { + int target = selectionIter == null ? cnt : selectionIter.next(); + targetRow.replaceRow(constructRow(target)); + assertRowEqual(targetRow, record, projection); + cnt++; + if (cnt > num) { + throw new AssertionError( + String.format("Expected %s record, but at least found %s", num, cnt)); + } + } + } + if (cnt != num) { + throw new AssertionError( + String.format("Expected %s record, but only found %s", num, cnt)); + } + LOG.info("Scan data cost {} ms", System.currentTimeMillis() - start); + } + + private void writeData(int rowCount) throws IOException { + long start = System.currentTimeMillis(); + FileFormat format = createFormat(); + FormatWriterFactory writerFactory = + format.createWriterFactory(ROW_TYPE, KEY_TYPE, VALUE_TYPE); + try (PositionOutputStream outputStream = fileIO.newOutputStream(file, true); + FormatWriter writer = writerFactory.create(outputStream, "ZSTD"); ) { + for (int i = 0; i < rowCount; i++) { + writer.addElement(constructRow(i)); + } + } + LOG.info("Write {} rows cost {} ms", rowCount, System.currentTimeMillis() - start); + } + + public void assertRowEqual(InternalRow expected, InternalRow actual, RowType rowType) { + Assertions.assertEquals(expected.getFieldCount(), actual.getFieldCount()); + Assertions.assertEquals(expected.getRowKind(), actual.getRowKind()); + List fields = rowType.getFields(); + InternalRow.FieldGetter[] getters = new InternalRow.FieldGetter[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + getters[i] = InternalRow.createFieldGetter(fields.get(i).type(), i); + } + for (InternalRow.FieldGetter getter : getters) { + Assertions.assertEquals(getter.getFieldOrNull(expected), getter.getFieldOrNull(actual)); + } + } + + private InternalRow constructRow(int index) { + return GenericRow.of( + index, + BinaryString.fromString("name" + index), + index * 1.1, + BinaryString.fromString("value" + index)); + } + + private FileFormat createFormat() { + SstFileFormatFactory factory = new SstFileFormatFactory(); + return factory.create( + new FileFormatFactory.FormatContext(options, 0, 0, MemorySize.ofMebiBytes(1))); + } +}