diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index b173239332..41e482cfdd 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -48,6 +48,7 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; + public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -85,10 +86,11 @@ public static WriterVersion fromString(String name) { private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; + private final int pageRowCountLimit; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, - ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) { + ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -102,6 +104,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.valuesWriterFactory = writerFactory; this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; + this.pageRowCountLimit = pageRowCountLimit; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -194,6 +197,10 @@ public boolean estimateNextSizeCheck() { return estimateNextSizeCheck; } + public int getPageRowCountLimit() { + return pageRowCountLimit; + } + public static Builder builder() { return new Builder(); } @@ -213,18 +220,22 @@ public static class Builder { private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; + private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private Builder() { } private Builder(ParquetProperties toCopy) { + this.pageSize = toCopy.pageSizeThreshold; this.enableDict = toCopy.enableDictionary; this.dictPageSize = toCopy.dictionaryPageSizeThreshold; this.writerVersion = toCopy.writerVersion; this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; + this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; + this.pageRowCountLimit = toCopy.pageRowCountLimit; } /** @@ -313,11 +324,17 @@ public Builder withColumnIndexTruncateLength(int length) { return this; } + public Builder withPageRowCountLimit(int rowCount) { + Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount); + pageRowCountLimit = rowCount; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength); + estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index 5cd7d876e4..f79c09de2f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -67,7 +67,7 @@ private interface ColumnWriterProvider { this.columns = new TreeMap<>(); - this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); + this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { } this.columns = unmodifiableMap(mcolumns); - this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); + this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -190,13 +190,17 @@ public void endRecord() { private void sizeCheck() { long minRecordToWait = Long.MAX_VALUE; + int pageRowCountLimit = props.getPageRowCountLimit(); + long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit; for (ColumnWriterBase writer : columns.values()) { long usedMem = writer.getCurrentPageBufferedSize(); long rows = rowCount - writer.getRowsWrittenSoFar(); long remainingMem = props.getPageSizeThreshold() - usedMem; - if (remainingMem <= thresholdTolerance) { + if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) { writer.writePage(); remainingMem = props.getPageSizeThreshold(); + } else { + rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, rowCount + (pageRowCountLimit - rows)); } long rowsToFillPage = usedMem == 0 ? @@ -219,5 +223,10 @@ private void sizeCheck() { } else { rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck(); } + + // Do the check earlier if required to keep the row count limit + if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) { + rowCountForNextSizeCheck = rowCountForNextRowCountCheck; + } } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index e5db38c945..f89d0cbf7a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -18,19 +18,28 @@ */ package org.apache.parquet.column.mem; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.impl.ColumnWriteStoreV1; +import org.apache.parquet.column.impl.ColumnWriteStoreV2; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.page.mem.MemPageStore; import org.apache.parquet.example.DummyRecordConverter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,6 +175,68 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } } + @Test + public void testPageSize() { + MessageType schema = Types.buildMessage() + .requiredList().requiredElement(BINARY).named("binary_col") + .requiredList().requiredElement(INT32).named("int32_col") + .named("msg"); + System.out.println(schema); + MemPageStore memPageStore = new MemPageStore(123); + + // Using V2 pages so we have rowCount info + ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder() + .withPageSize(1024) // Less than 10 records for binary_col + .withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing + .withPageRowCountLimit(10) + .withDictionaryEncoding(false) // Enforce having large binary_col pages + .build()); + ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" }); + ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol); + ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" }); + ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col); + // Writing 123 records + for (int i = 0; i < 123; ++i) { + // Writing 10 values per record + for (int j = 0; j < 10; ++j) { + binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2); + int32ColWriter.write(42, j == 0 ? 0 : 2, 2); + } + writeStore.endRecord(); + } + writeStore.flush(); + + // Check that all the binary_col pages are <= 1024 bytes + { + PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol); + assertEquals(1230, binaryColPageReader.getTotalValueCount()); + int pageCnt = 0; + int valueCnt = 0; + while (valueCnt < binaryColPageReader.getTotalValueCount()) { + DataPage page = binaryColPageReader.readPage(); + ++pageCnt; + valueCnt += page.getValueCount(); + LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); + assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024); + } + } + + // Check that all the int32_col pages contain <= 10 rows + { + PageReader int32ColPageReader = memPageStore.getPageReader(int32Col); + assertEquals(1230, int32ColPageReader.getTotalValueCount()); + int pageCnt = 0; + int valueCnt = 0; + while (valueCnt < int32ColPageReader.getTotalValueCount()) { + DataPage page = int32ColPageReader.readPage(); + ++pageCnt; + valueCnt += page.getValueCount(); + LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); + assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10); + } + } + } + private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { return new ColumnWriteStoreV1(memPageStore, ParquetProperties.builder() diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 0789bf50d4..04cbd15c0b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -144,6 +144,7 @@ public static enum JobSummaryLevel { public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; + public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -325,6 +326,18 @@ private static int getColumnIndexTruncateLength(Configuration conf) { return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); } + public static void setPageRowCountLimit(JobContext jobContext, int rowCount) { + setPageRowCountLimit(getConfiguration(jobContext), rowCount); + } + + public static void setPageRowCountLimit(Configuration conf, int rowCount) { + conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount); + } + + private static int getPageRowCountLimit(Configuration conf) { + return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -380,6 +393,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) + .withPageRowCountLimit(getPageRowCountLimit(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -398,6 +412,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); + LOG.info("Page row count limit to {}", props.getPageRowCountLimit()); } WriteContext init = writeSupport.init(conf); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 5b0e4f82d1..1ed5e32ca7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -425,6 +425,17 @@ public SELF withPageSize(int pageSize) { return self(); } + /** + * Sets the Parquet format page row count limit used by the constructed writer. + * + * @param rowCount limit for the number of rows stored in a page + * @return this builder for method chaining + */ + public SELF withPageRowCountLimit(int rowCount) { + encodingPropsBuilder.withPageRowCountLimit(rowCount); + return self(); + } + /** * Set the Parquet format dictionary page size used by the constructed * writer.