Skip to content

Commit 15427b3

Browse files
Enmkarthurpassos
authored andcommitted
Merge pull request #586 from Altinity/metadata_cache_for_parquet_24_12_2
Parquet File Metadata caching implementation
1 parent c4d9b13 commit 15427b3

File tree

9 files changed

+151
-6
lines changed

9 files changed

+151
-6
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -968,7 +968,9 @@ The server successfully detected this situation and will download merged part fr
968968
M(FilterTransformPassedRows, "Number of rows that passed the filter in the query", ValueType::Number) \
969969
M(FilterTransformPassedBytes, "Number of bytes that passed the filter in the query", ValueType::Bytes) \
970970
M(QueryPreempted, "How many times tasks are paused and waiting due to 'priority' setting", ValueType::Number) \
971-
971+
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
972+
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
973+
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
972974

973975
#ifdef APPLY_FOR_EXTERNAL_EVENTS
974976
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1301,7 +1301,7 @@ Set the quoting rule for identifiers in SHOW CREATE query
13011301
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
13021302
Set the quoting style for identifiers in SHOW CREATE query
13031303
)", 0) \
1304-
1304+
DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \
13051305
// End of FORMAT_FACTORY_SETTINGS
13061306

13071307
#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \

src/Core/ServerSettings.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,9 +1019,8 @@ namespace DB
10191019
<wait_dictionaries_load_at_startup>true</wait_dictionaries_load_at_startup>
10201020
```
10211021
)", 0) \
1022-
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0)
1023-
1024-
1022+
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \
1023+
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
10251024
// clang-format on
10261025

10271026
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Processors/Formats/IInputFormat.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition
6767

6868
void needOnlyCount() { need_only_count = true; }
6969

70+
/// Set additional info/key/id related to underlying storage of the ReadBuffer
71+
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}
72+
7073
protected:
7174
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
7275

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#if USE_PARQUET
44

55
#include <Columns/ColumnNullable.h>
6+
#include <Core/Settings.h>
7+
#include <Core/ServerSettings.h>
8+
#include <Common/ProfileEvents.h>
69
#include <Common/logger_useful.h>
710
#include <Common/ThreadPool.h>
811
#include <Formats/FormatFactory.h>
@@ -35,6 +38,8 @@
3538
namespace ProfileEvents
3639
{
3740
extern const Event ParquetFetchWaitTimeMicroseconds;
41+
extern const Event ParquetMetaDataCacheHits;
42+
extern const Event ParquetMetaDataCacheMisses;
3843
}
3944

4045
namespace CurrentMetrics
@@ -51,6 +56,16 @@ namespace CurrentMetrics
5156
namespace DB
5257
{
5358

59+
namespace Setting
60+
{
61+
extern const SettingsBool input_format_parquet_use_metadata_cache;
62+
}
63+
64+
namespace ServerSetting
65+
{
66+
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
67+
}
68+
5469
namespace ErrorCodes
5570
{
5671
extern const int BAD_ARGUMENTS;
@@ -510,6 +525,58 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
510525
return hyperrectangle;
511526
}
512527

528+
ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
529+
: CacheBase(max_size_bytes) {}
530+
531+
ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
532+
{
533+
static ParquetFileMetaDataCache instance(max_size_bytes);
534+
return &instance;
535+
}
536+
537+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
538+
{
539+
createArrowFileIfNotCreated();
540+
return parquet::ReadMetaData(arrow_file);
541+
}
542+
543+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
544+
{
545+
// in-memory cache is not implemented for local file operations, only for remote files
546+
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
547+
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
548+
if (!metadata_cache.use_cache || metadata_cache.key.empty())
549+
{
550+
return readMetadataFromFile();
551+
}
552+
553+
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
554+
metadata_cache.key,
555+
[&]()
556+
{
557+
return readMetadataFromFile();
558+
}
559+
);
560+
if (loaded)
561+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
562+
else
563+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
564+
return parquet_file_metadata;
565+
}
566+
567+
void ParquetBlockInputFormat::createArrowFileIfNotCreated()
568+
{
569+
if (arrow_file)
570+
{
571+
return;
572+
}
573+
574+
// Create arrow file adapter.
575+
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
576+
// we'll need to read (which we know in advance). Use max_download_threads for that.
577+
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
578+
}
579+
513580
std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
514581
{
515582
std::unordered_set<std::size_t> column_keys;
@@ -609,7 +676,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
609676
if (is_stopped)
610677
return;
611678

612-
metadata = parquet::ReadMetaData(arrow_file);
679+
metadata = getFileMetaData();
613680
const bool prefetch_group = supportPrefetch();
614681

615682
std::shared_ptr<arrow::Schema> schema;
@@ -709,6 +776,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
709776
}
710777
}
711778

779+
bool has_row_groups_to_read = false;
780+
712781
auto skip_row_group_based_on_filters = [&](int row_group)
713782
{
714783
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
@@ -758,9 +827,23 @@ void ParquetBlockInputFormat::initializeIfNeeded()
758827
row_group_batches.back().total_bytes_compressed += row_group_size;
759828
auto rows = adaptive_chunk_size(row_group);
760829
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
830+
831+
has_row_groups_to_read = true;
832+
}
833+
834+
if (has_row_groups_to_read)
835+
{
836+
createArrowFileIfNotCreated();
761837
}
762838
}
763839

840+
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
841+
{
842+
metadata_cache.key = key_;
843+
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
844+
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
845+
}
846+
764847
void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
765848
{
766849
const bool row_group_prefetch = supportPrefetch();

src/Processors/Formats/Impl/ParquetBlockInputFormat.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "config.h"
33
#if USE_PARQUET
44

5+
#include <Common/CacheBase.h>
56
#include <Processors/Formats/IInputFormat.h>
67
#include <Processors/Formats/ISchemaReader.h>
78
#include <Formats/FormatSettings.h>
@@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat
7273

7374
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }
7475

76+
void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;
77+
7578
private:
7679
Chunk read() override;
7780

@@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat
9093

9194
void threadFunction(size_t row_group_batch_idx);
9295

96+
void createArrowFileIfNotCreated();
97+
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();
98+
99+
std::shared_ptr<parquet::FileMetaData> getFileMetaData();
100+
93101
inline bool supportPrefetch() const;
94102

95103
// Data layout in the file:
@@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat
338346
std::exception_ptr background_exception = nullptr;
339347
std::atomic<int> is_stopped{0};
340348
bool is_initialized = false;
349+
struct Cache
350+
{
351+
String key;
352+
bool use_cache = false;
353+
UInt64 max_size_bytes{0};
354+
} metadata_cache;
341355
};
342356

343357
class ParquetSchemaReader : public ISchemaReader
@@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader
356370
std::shared_ptr<parquet::FileMetaData> metadata;
357371
};
358372

373+
class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
374+
{
375+
public:
376+
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
377+
void clear() {}
378+
379+
private:
380+
ParquetFileMetaDataCache(UInt64 max_size_bytes);
381+
};
382+
359383
}
360384

361385
#endif

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
499499
if (need_only_count)
500500
input_format->needOnlyCount();
501501

502+
if (!object_info->getPath().empty())
503+
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);
504+
502505
builder.init(Pipe(input_format));
503506

504507
if (auto transformer = configuration->getSchemaTransformer(object_info->getPath()))
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
10
2+
10
3+
10
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- Tags: no-parallel, no-fasttest
2+
3+
DROP TABLE IF EXISTS t_parquet_03262;
4+
5+
CREATE TABLE t_parquet_03262 (a UInt64)
6+
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
7+
PARTITION BY a;
8+
9+
INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;
10+
11+
SELECT COUNT(*)
12+
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
13+
SETTINGS input_format_parquet_use_metadata_cache=1;
14+
15+
SELECT COUNT(*)
16+
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
17+
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache';
18+
19+
SYSTEM FLUSH LOGS;
20+
21+
SELECT ProfileEvents['ParquetMetaDataCacheHits']
22+
FROM system.query_log
23+
where log_comment = 'test_03262_parquet_metadata_cache'
24+
AND type = 'QueryFinish'
25+
ORDER BY event_time desc
26+
LIMIT 1;
27+
28+
DROP TABLE t_parquet_03262;

0 commit comments

Comments
 (0)