Skip to content

Commit 54ac1eb

Browse files
omega-bigstreamwesm
authored andcommitted
commit 9eecaaf
Author: Omega Gamage <[email protected]> Date: Tue Feb 18 14:23:08 2020 +0530 used std::map instead of std::unordered_map to store num_data_pages commit 6822fc6 Author: Omega Gamage <[email protected]> Date: Mon Feb 17 11:45:03 2020 +0530 remove default arguments for page number counts in ColumnChunkMetaDataBuilder::Finish commit 49e1861 Author: Omega Gamage <[email protected]> Date: Fri Feb 14 16:22:41 2020 +0530 Added the class PageEncodingStats to types.h commit 9dc5b6b Merge: 9b776f3 d65a71a Author: Omega Gamage <[email protected]> Date: Wed Feb 12 12:25:50 2020 +0530 resolved merge conflicts commit 9b776f3 Author: Omega Gamage <[email protected]> Date: Thu Feb 6 16:12:31 2020 +0530 PARQUET-1780: [C++] Set ColumnMetadata.encoding_stats field Fixed lint errors Use std::map to store datapage count Added unit test to test encoding_stats commit d65a71a Author: Omega Gamage <[email protected]> Date: Thu Feb 6 20:11:09 2020 +0530 Fixed lint errors commit 053ce4d Author: Omega Gamage <[email protected]> Date: Thu Feb 6 16:12:31 2020 +0530 PARQUET-1780: [C++] Set ColumnMetadata.encoding_stats field
1 parent 467129e commit 54ac1eb

9 files changed

+185
-13
lines changed

cpp/src/parquet/column_writer.cc

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <algorithm>
2121
#include <cstdint>
2222
#include <cstring>
23+
#include <map>
2324
#include <memory>
2425
#include <string>
2526
#include <utility>
@@ -167,7 +168,8 @@ class SerializedPageWriter : public PageWriter {
167168
column_ordinal_(column_chunk_ordinal),
168169
meta_encryptor_(std::move(meta_encryptor)),
169170
data_encryptor_(std::move(data_encryptor)),
170-
encryption_buffer_(AllocateBuffer(pool, 0)) {
171+
encryption_buffer_(AllocateBuffer(pool, 0)),
172+
num_dict_pages_(0) {
171173
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
172174
InitEncryption();
173175
}
@@ -226,6 +228,7 @@ class SerializedPageWriter : public PageWriter {
226228

227229
total_uncompressed_size_ += uncompressed_size + header_size;
228230
total_compressed_size_ += output_data_len + header_size;
231+
num_dict_pages_++;
229232

230233
PARQUET_ASSIGN_OR_THROW(int64_t final_pos, sink_->Tell());
231234
return final_pos - start_pos;
@@ -238,7 +241,7 @@ class SerializedPageWriter : public PageWriter {
238241
// index_page_offset = -1 since they are not supported
239242
metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
240243
total_compressed_size_, total_uncompressed_size_, has_dictionary,
241-
fallback, meta_encryptor_);
244+
fallback, num_dict_pages_, num_data_pages_, meta_encryptor_);
242245
// Write metadata at end of column chunk
243246
metadata_->WriteTo(sink_.get());
244247
}
@@ -310,7 +313,11 @@ class SerializedPageWriter : public PageWriter {
310313
total_uncompressed_size_ += uncompressed_size + header_size;
311314
total_compressed_size_ += output_data_len + header_size;
312315
num_values_ += page.num_values();
313-
316+
if (num_data_pages_.find(page.encoding()) != num_data_pages_.end()) {
317+
num_data_pages_[page.encoding()]++;
318+
} else {
319+
num_data_pages_[page.encoding()] = 1;
320+
}
314321
++page_ordinal_;
315322
PARQUET_ASSIGN_OR_THROW(int64_t current_pos, sink_->Tell());
316323
return current_pos - start_pos;
@@ -405,6 +412,9 @@ class SerializedPageWriter : public PageWriter {
405412
std::shared_ptr<Encryptor> data_encryptor_;
406413

407414
std::shared_ptr<ResizableBuffer> encryption_buffer_;
415+
416+
int32_t num_dict_pages_;
417+
std::map<Encoding::type, int32_t> num_data_pages_;
408418
};
409419

410420
// This implementation of the PageWriter writes to the final sink on Close .
@@ -441,7 +451,8 @@ class BufferedPageWriter : public PageWriter {
441451
metadata_->Finish(pager_->num_values(), dictionary_page_offset, -1,
442452
pager_->data_page_offset() + final_position,
443453
pager_->total_compressed_size(), pager_->total_uncompressed_size(),
444-
has_dictionary, fallback, pager_->meta_encryptor_);
454+
has_dictionary, fallback, pager_->num_dict_pages_,
455+
pager_->num_data_pages_, pager_->meta_encryptor_);
445456

446457
// Write metadata at end of column chunk
447458
metadata_->WriteTo(in_memory_sink_.get());

cpp/src/parquet/column_writer_test.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,39 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
185185
{Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
186186
ASSERT_EQ(encodings, expected);
187187
}
188+
189+
std::vector<parquet::PageEncodingStats> encoding_stats =
190+
this->metadata_encoding_stats();
191+
if (this->type_num() == Type::BOOLEAN) {
192+
ASSERT_EQ(encoding_stats[0].encoding, Encoding::PLAIN);
193+
ASSERT_EQ(encoding_stats[0].page_type, PageType::DATA_PAGE);
194+
ASSERT_EQ(encoding_stats[1].encoding, Encoding::RLE);
195+
ASSERT_EQ(encoding_stats[1].page_type, PageType::DATA_PAGE);
196+
} else if (version == ParquetVersion::PARQUET_1_0) {
197+
std::vector<Encoding::type> expected(
198+
{Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
199+
for (size_t i = 0; i < encoding_stats.size(); i++) {
200+
if (i == 0) {
201+
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
202+
ASSERT_EQ(encoding_stats[i].page_type, PageType::DICTIONARY_PAGE);
203+
} else {
204+
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
205+
ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
206+
}
207+
}
208+
} else {
209+
std::vector<Encoding::type> expected(
210+
{Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
211+
for (size_t i = 0; i < encoding_stats.size(); i++) {
212+
if (i == 0) {
213+
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
214+
ASSERT_EQ(encoding_stats[i].page_type, PageType::DICTIONARY_PAGE);
215+
} else {
216+
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
217+
ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
218+
}
219+
}
220+
}
188221
}
189222

190223
void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
@@ -273,6 +306,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
273306
return metadata_accessor->encodings();
274307
}
275308

309+
std::vector<parquet::PageEncodingStats> metadata_encoding_stats() {
310+
// Metadata accessor must be created lazily.
311+
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
312+
// complete (no changes to the metadata buffer can be made after instantiation)
313+
auto metadata_accessor =
314+
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
315+
return metadata_accessor->encoding_stats();
316+
}
317+
276318
protected:
277319
int64_t values_read_;
278320
// Keep the reader alive as for ByteArray the lifetime of the ByteArray

cpp/src/parquet/metadata.cc

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,14 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
214214
for (auto encoding : column_metadata_->encodings) {
215215
encodings_.push_back(FromThrift(encoding));
216216
}
217+
auto fromthrift = [](format::PageEncodingStats page_encoding_stats) {
218+
return parquet::PageEncodingStats(FromThrift(page_encoding_stats.page_type),
219+
FromThrift(page_encoding_stats.encoding),
220+
page_encoding_stats.count);
221+
};
222+
for (auto encoding_stats : column_metadata_->encoding_stats) {
223+
encoding_stats_.push_back(fromthrift(encoding_stats));
224+
}
217225
possible_stats_ = nullptr;
218226
}
219227
// column chunk
@@ -257,6 +265,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
257265

258266
const std::vector<Encoding::type>& encodings() const { return encodings_; }
259267

268+
const std::vector<PageEncodingStats>& encoding_stats() const { return encoding_stats_; }
269+
260270
inline bool has_dictionary_page() const {
261271
return column_metadata_->__isset.dictionary_page_offset;
262272
}
@@ -293,6 +303,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
293303
private:
294304
mutable std::shared_ptr<Statistics> possible_stats_;
295305
std::vector<Encoding::type> encodings_;
306+
std::vector<PageEncodingStats> encoding_stats_;
296307
const format::ColumnChunk* column_;
297308
const format::ColumnMetaData* column_metadata_;
298309
format::ColumnMetaData decrypted_metadata_;
@@ -367,6 +378,10 @@ const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const {
367378
return impl_->encodings();
368379
}
369380

381+
const std::vector<PageEncodingStats>& ColumnChunkMetaData::encoding_stats() const {
382+
return impl_->encoding_stats();
383+
}
384+
370385
int64_t ColumnChunkMetaData::total_uncompressed_size() const {
371386
return impl_->total_uncompressed_size();
372387
}
@@ -966,7 +981,9 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
966981
void Finish(int64_t num_values, int64_t dictionary_page_offset,
967982
int64_t index_page_offset, int64_t data_page_offset,
968983
int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
969-
bool dictionary_fallback, const std::shared_ptr<Encryptor>& encryptor) {
984+
bool dictionary_fallback, int32_t num_dict_pages,
985+
std::map<Encoding::type, int32_t>& num_data_pages,
986+
const std::shared_ptr<Encryptor>& encryptor) {
970987
if (dictionary_page_offset > 0) {
971988
column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
972989
column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
@@ -983,23 +1000,62 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
9831000
column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
9841001

9851002
std::vector<format::Encoding::type> thrift_encodings;
1003+
std::vector<format::PageEncodingStats> thrift_encoding_stats;
9861004
if (has_dictionary) {
9871005
thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
1006+
format::PageEncodingStats dict_page_stats;
1007+
dict_page_stats.__set_page_type(format::PageType::DICTIONARY_PAGE);
1008+
dict_page_stats.__set_encoding(ToThrift(properties_->dictionary_index_encoding()));
1009+
dict_page_stats.__set_count(num_dict_pages);
1010+
thrift_encoding_stats.push_back(dict_page_stats);
1011+
// Add DataPage stats
1012+
format::PageEncodingStats data_page_stats;
9881013
if (properties_->version() == ParquetVersion::PARQUET_1_0) {
9891014
thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
1015+
data_page_stats.__set_page_type(format::PageType::DATA_PAGE);
1016+
data_page_stats.__set_encoding(ToThrift(Encoding::PLAIN));
1017+
data_page_stats.__set_count(num_data_pages[Encoding::PLAIN]);
9901018
} else {
9911019
thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
1020+
data_page_stats.__set_page_type(format::PageType::DATA_PAGE);
1021+
data_page_stats.__set_encoding(ToThrift(properties_->dictionary_page_encoding()));
1022+
data_page_stats.__set_count(
1023+
num_data_pages[properties_->dictionary_page_encoding()]);
9921024
}
1025+
thrift_encoding_stats.push_back(data_page_stats);
9931026
} else { // Dictionary not enabled
9941027
thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path())));
1028+
// Add DataPage stats
1029+
format::PageEncodingStats data_page_stats;
1030+
data_page_stats.__set_page_type(format::PageType::DATA_PAGE);
1031+
if (column_->physical_type() == Type::BOOLEAN) {
1032+
data_page_stats.__set_encoding(ToThrift(Encoding::PLAIN));
1033+
data_page_stats.__set_count(num_data_pages[Encoding::PLAIN]);
1034+
} else {
1035+
data_page_stats.__set_encoding(ToThrift(properties_->encoding(column_->path())));
1036+
data_page_stats.__set_count(
1037+
num_data_pages[properties_->encoding(column_->path())]);
1038+
}
1039+
thrift_encoding_stats.push_back(data_page_stats);
9951040
}
9961041
thrift_encodings.push_back(ToThrift(Encoding::RLE));
1042+
format::PageEncodingStats data_page_stats;
1043+
data_page_stats.__set_page_type(format::PageType::DATA_PAGE);
1044+
data_page_stats.__set_encoding(ToThrift(Encoding::RLE));
1045+
data_page_stats.__set_count(num_data_pages[Encoding::RLE]);
1046+
thrift_encoding_stats.push_back(data_page_stats);
9971047
// Only PLAIN encoding is supported for fallback in V1
9981048
// TODO(majetideepak): Use user specified encoding for V2
9991049
if (dictionary_fallback) {
10001050
thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
1051+
format::PageEncodingStats fallback_page_stats;
1052+
fallback_page_stats.__set_page_type(format::PageType::DATA_PAGE);
1053+
fallback_page_stats.__set_encoding(ToThrift(Encoding::PLAIN));
1054+
fallback_page_stats.__set_count(num_data_pages[Encoding::PLAIN]);
1055+
thrift_encoding_stats.push_back(fallback_page_stats);
10011056
}
10021057
column_chunk_->meta_data.__set_encodings(thrift_encodings);
1058+
column_chunk_->meta_data.__set_encoding_stats(thrift_encoding_stats);
10031059

10041060
const auto& encrypt_md =
10051061
properties_->column_encryption_properties(column_->path()->ToDotString());
@@ -1122,11 +1178,12 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
11221178
int64_t index_page_offset,
11231179
int64_t data_page_offset, int64_t compressed_size,
11241180
int64_t uncompressed_size, bool has_dictionary,
1125-
bool dictionary_fallback,
1181+
bool dictionary_fallback, int32_t num_dict_pages,
1182+
std::map<Encoding::type, int32_t> num_data_pages,
11261183
const std::shared_ptr<Encryptor>& encryptor) {
11271184
impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
11281185
compressed_size, uncompressed_size, has_dictionary, dictionary_fallback,
1129-
encryptor);
1186+
num_dict_pages, num_data_pages, encryptor);
11301187
}
11311188

11321189
void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) {

cpp/src/parquet/metadata.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <map>
2222
#include <memory>
2323
#include <string>
24+
#include <utility>
2425
#include <vector>
2526

2627
#include "arrow/util/key_value_metadata.h"
@@ -150,6 +151,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
150151
bool can_decompress() const;
151152

152153
const std::vector<Encoding::type>& encodings() const;
154+
const std::vector<PageEncodingStats>& encoding_stats() const;
153155
bool has_dictionary_page() const;
154156
int64_t dictionary_page_offset() const;
155157
int64_t data_page_offset() const;
@@ -319,7 +321,8 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
319321
void Finish(int64_t num_values, int64_t dictionary_page_offset,
320322
int64_t index_page_offset, int64_t data_page_offset,
321323
int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
322-
bool dictionary_fallback,
324+
bool dictionary_fallback, int32_t num_dict_pages,
325+
std::map<Encoding::type, int32_t>,
323326
const std::shared_ptr<Encryptor>& encryptor = NULLPTR);
324327

325328
// The metadata contents, suitable for passing to ColumnChunkMetaData::Make

cpp/src/parquet/metadata_test.cc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,19 @@ std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
3535
EncodedStatistics stats_int, EncodedStatistics stats_float) {
3636
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
3737
auto rg1_builder = f_builder->AppendRowGroup();
38-
3938
// Write the metadata
4039
// rowgroup1 metadata
4140
auto col1_builder = rg1_builder->NextColumnChunk();
4241
auto col2_builder = rg1_builder->NextColumnChunk();
4342
// column metadata
43+
std::map<Encoding::type, int32_t> encoding_stats(
44+
{{Encoding::RLE_DICTIONARY, 1}, {Encoding::PLAIN, 1}, {Encoding::RLE, 1}});
4445
stats_int.set_is_signed(true);
4546
col1_builder->SetStatistics(stats_int);
4647
stats_float.set_is_signed(true);
4748
col2_builder->SetStatistics(stats_float);
48-
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
49-
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
49+
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false, 1, encoding_stats);
50+
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false, 1, encoding_stats);
5051

5152
rg1_builder->set_num_rows(nrows / 2);
5253
rg1_builder->Finish(1024);
@@ -58,8 +59,8 @@ std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
5859
// column metadata
5960
col1_builder->SetStatistics(stats_int);
6061
col2_builder->SetStatistics(stats_float);
61-
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
62-
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
62+
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false, 1, encoding_stats);
63+
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false, 1, encoding_stats);
6364

6465
rg2_builder->set_num_rows(nrows / 2);
6566
rg2_builder->Finish(1024);
@@ -155,6 +156,8 @@ TEST(Metadata, TestBuildAccess) {
155156
ASSERT_EQ(24, rg1_column2->dictionary_page_offset());
156157
ASSERT_EQ(10, rg1_column1->data_page_offset());
157158
ASSERT_EQ(30, rg1_column2->data_page_offset());
159+
ASSERT_EQ(3, rg1_column1->encoding_stats().size());
160+
ASSERT_EQ(3, rg1_column2->encoding_stats().size());
158161

159162
auto rg2_accessor = f_accessors[loop_index]->RowGroup(1);
160163
ASSERT_EQ(2, rg2_accessor->num_columns());
@@ -187,6 +190,8 @@ TEST(Metadata, TestBuildAccess) {
187190
ASSERT_EQ(16, rg2_column2->dictionary_page_offset());
188191
ASSERT_EQ(10, rg2_column1->data_page_offset());
189192
ASSERT_EQ(26, rg2_column2->data_page_offset());
193+
ASSERT_EQ(3, rg2_column1->encoding_stats().size());
194+
ASSERT_EQ(3, rg2_column2->encoding_stats().size());
190195

191196
// Test FileMetaData::set_file_path
192197
ASSERT_TRUE(rg2_column1->file_path().empty());

cpp/src/parquet/thrift_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ static inline Encoding::type FromThrift(format::Encoding::type type) {
8282
return static_cast<Encoding::type>(type);
8383
}
8484

85+
static inline PageType::type FromThrift(format::PageType::type type) {
86+
return static_cast<PageType::type>(type);
87+
}
88+
8589
static inline AadMetadata FromThrift(format::AesGcmV1 aesGcmV1) {
8690
return AadMetadata{aesGcmV1.aad_prefix, aesGcmV1.aad_file_unique,
8791
aesGcmV1.supply_aad_prefix};

cpp/src/parquet/types.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,24 @@ SortOrder::type GetSortOrder(const std::shared_ptr<const LogicalType>& logical_t
335335
ColumnOrder ColumnOrder::undefined_ = ColumnOrder(ColumnOrder::UNDEFINED);
336336
ColumnOrder ColumnOrder::type_defined_ = ColumnOrder(ColumnOrder::TYPE_DEFINED_ORDER);
337337

338+
void PageEncodingStats::set_page_type(const PageType::type val) { this->page_type = val; }
339+
340+
void PageEncodingStats::set_encoding(const Encoding::type val) { this->encoding = val; }
341+
342+
void PageEncodingStats::set_count(const int32_t val) { this->count = val; }
343+
344+
PageEncodingStats::PageEncodingStats(const PageEncodingStats& obj) {
345+
page_type = obj.page_type;
346+
encoding = obj.encoding;
347+
count = obj.count;
348+
}
349+
PageEncodingStats& PageEncodingStats::operator=(const PageEncodingStats& obj) {
350+
page_type = obj.page_type;
351+
encoding = obj.encoding;
352+
count = obj.count;
353+
return *this;
354+
}
355+
338356
// Static methods for LogicalType class
339357

340358
std::shared_ptr<const LogicalType> LogicalType::FromConvertedType(

0 commit comments

Comments
 (0)