From 417d5bb6f3c55adfeed2adcce3380e9e0a08fc47 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 9 Jan 2025 15:23:35 -0300 Subject: [PATCH 1/7] impl --- src/Common/ProfileEvents.cpp | 3 +- src/Core/FormatFactorySettings.h | 4 +- src/Core/ServerSettings.cpp | 4 +- src/Processors/Formats/IInputFormat.h | 3 + .../Formats/Impl/ParquetBlockInputFormat.cpp | 75 ++++++++++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 24 ++++++ .../StorageObjectStorageSource.cpp | 3 + ...et_object_storage_metadata_cache.reference | 0 ..._parquet_object_storage_metadata_cache.sql | 0 9 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 491b7ecefc3d..a82c6872dcd7 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -915,7 +915,8 @@ The server successfully detected this situation and will download merged part fr M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \ \ M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ - + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 88cc4c6e9a67..fcd9e6aa8f27 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1250,8 +1250,8 @@ Set the quoting rule for identifiers in SHOW CREATE query )", 0) \ DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"( Set the quoting style for identifiers in SHOW CREATE query -)", 0) \ - +)", 0) \ + DECLARE(Bool, input_format_parquet_use_metadata_cache, false, "Enable parquet file metadata caching, 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 67c00d718233..a2a933cf77d0 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -212,8 +212,8 @@ namespace DB DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \ - DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ - + DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache", 0) \ // clang-format on diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index e4f9fc318f55..900d4edc39f1 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition void needOnlyCount() { need_only_count = true; } + /// Set additional info/key/id related to underlying storage of the ReadBuffer + virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {} + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index f9567ec90f0d..db8ea7f00026 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -3,6 +3,9 @@ #if USE_PARQUET +#include +#include +#include #include #include #include @@ -33,6 +36,8 @@ namespace ProfileEvents { extern const Event ParquetFetchWaitTimeMicroseconds; + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -484,6 +489,58 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries) + : CacheBase(max_cache_entries) {} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries) +{ + static ParquetFileMetaDataCache instance(max_cache_entries); + return &instance; +} + +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() +{ + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { + return; + } + + // Create arrow file adapter. + // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that + // we'll need to read (which we know in advance). Use max_download_threads for that. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, const Block & header_, @@ -528,7 +585,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); const bool prefetch_group = supportPrefetch(); std::shared_ptr schema; @@ -594,6 +651,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys(); } + bool has_row_groups_to_read = false; + for (int row_group = 0; row_group < num_row_groups; ++row_group) { if (skip_row_groups.contains(row_group)) @@ -628,7 +687,21 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + + has_row_groups_to_read = true; } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); + } +} + +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings.input_format_parquet_use_metadata_cache; + metadata_cache.max_entries = server_settings.input_format_parquet_metadata_cache_max_entries; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index f941bb70ebce..d0cc86a9c794 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -2,6 +2,7 @@ #include "config.h" #if USE_PARQUET +#include #include #include #include @@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override; + private: Chunk read() override; @@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + inline bool supportPrefetch() const; // Data layout in the file: @@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat std::exception_ptr background_exception = nullptr; std::atomic is_stopped{0}; bool is_initialized = false; + struct Cache + { + String key; + bool use_cache = false; + UInt64 max_entries{0}; + } metadata_cache; }; class ParquetSchemaReader : public ISchemaReader @@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader std::shared_ptr metadata; }; +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries); + void clear() {} + +private: + ParquetFileMetaDataCache(UInt64 max_cache_entries); +}; + } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 168611a0e923..b01566d00ca3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -414,6 +414,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (!object_info->getPath().empty()) + input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); if (auto transformer = configuration->getSchemaTransformer(object_info->getPath())) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql new file mode 100644 index 000000000000..e69de29bb2d1 From 0c1dedf12a4fa019b3f6ab084e6bcbd1a2e4ab3a Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 9 Jan 2025 15:29:50 -0300 Subject: [PATCH 2/7] add tests --- ...et_object_storage_metadata_cache.reference | 3 ++ ..._parquet_object_storage_metadata_cache.sql | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference index e69de29bb2d1..51fdf048b8ac 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -0,0 +1,3 @@ +10 +10 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index e69de29bb2d1..f453dcba0c66 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -0,0 +1,28 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +DROP TABLE t_parquet_03262; From 401a6992f75cfcd363af2e6eaffe8cf580dd1623 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 10 Jan 2025 11:29:49 -0300 Subject: [PATCH 3/7] small updates --- src/Common/ProfileEvents.cpp | 7 ++++--- src/Core/FormatFactorySettings.h | 4 ++-- .../Formats/Impl/ParquetBlockInputFormat.cpp | 14 ++++++++++++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index a82c6872dcd7..030a0f0cc77d 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -914,9 +914,10 @@ The server successfully detected this situation and will download merged part fr M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \ M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \ \ - M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ - M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \ - M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \ + M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ + \ + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index fcd9e6aa8f27..225e3a7f8594 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1250,8 +1250,8 @@ Set the quoting rule for identifiers in SHOW CREATE query )", 0) \ DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"( Set the quoting style for identifiers in SHOW CREATE query -)", 0) \ - DECLARE(Bool, input_format_parquet_use_metadata_cache, false, "Enable parquet file metadata caching, 0) \ +)", 0) \ + DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index db8ea7f00026..7e08bb71be96 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -54,6 +54,16 @@ namespace CurrentMetrics namespace DB { +namespace Setting +{ + extern const SettingsBool input_format_parquet_use_metadata_cache; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_entries; +} + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -700,8 +710,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) { metadata_cache.key = key_; - metadata_cache.use_cache = settings.input_format_parquet_use_metadata_cache; - metadata_cache.max_entries = server_settings.input_format_parquet_metadata_cache_max_entries; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; + metadata_cache.max_entries = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_entries]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) From 085cdfb65776cd2377595948b34e3c34359a56ba Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 10 Jan 2025 14:20:51 -0300 Subject: [PATCH 4/7] added max byte limit, still need to test --- src/Core/ServerSettings.cpp | 4 ++-- .../Formats/Impl/ParquetBlockInputFormat.cpp | 12 +++++++----- .../Formats/Impl/ParquetBlockInputFormat.h | 5 +++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index a2a933cf77d0..68e2e81a0049 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -213,8 +213,8 @@ namespace DB DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \ DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ - DECLARE(UInt64, input_format_parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache", 0) \ - + DECLARE(UInt64, input_format_parquet_metadata_cache_max_entries, 10000, "Maximum number of parquet file metadata to cache", 0) \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 10000*50000, "Maximum size of parquet file metadata cache", 0) \ // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 7e08bb71be96..ca0a75c067e9 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -62,6 +62,7 @@ namespace Setting namespace ServerSetting { extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_entries; + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } namespace ErrorCodes @@ -499,12 +500,12 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } -ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries) - : CacheBase(max_cache_entries) {} +ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes, UInt64 max_entries) + : CacheBase(max_size_bytes, max_entries) {} -ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries) +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes, UInt64 max_entries) { - static ParquetFileMetaDataCache instance(max_cache_entries); + static ParquetFileMetaDataCache instance(max_size_bytes, max_entries); return &instance; } @@ -524,7 +525,7 @@ std::shared_ptr ParquetBlockInputFormat::getFileMetaData( return readMetadataFromFile(); } - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet( + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes, metadata_cache.max_entries)->getOrSet( metadata_cache.key, [&]() { @@ -711,6 +712,7 @@ void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & { metadata_cache.key = key_; metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; + metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]; metadata_cache.max_entries = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_entries]; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index d0cc86a9c794..35ccc5765f23 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -351,6 +351,7 @@ class ParquetBlockInputFormat : public IInputFormat String key; bool use_cache = false; UInt64 max_entries{0}; + UInt64 max_size_bytes{0}; } metadata_cache; }; @@ -373,11 +374,11 @@ class ParquetSchemaReader : public ISchemaReader class ParquetFileMetaDataCache : public CacheBase { public: - static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries); + static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes, UInt64 max_entries); void clear() {} private: - ParquetFileMetaDataCache(UInt64 max_cache_entries); + ParquetFileMetaDataCache(UInt64 max_size_bytes, UInt64 max_entries); }; } From 53b6fc3ecc92cb7d7f16d9f88400c1f9f56af0de Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Sat, 11 Jan 2025 09:25:46 -0300 Subject: [PATCH 5/7] remove max_entries cache --- src/Core/ServerSettings.cpp | 1 - .../Formats/Impl/ParquetBlockInputFormat.cpp | 12 +++++------- .../Formats/Impl/ParquetBlockInputFormat.h | 1 - 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 68e2e81a0049..e4bb00a222f7 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -213,7 +213,6 @@ namespace DB DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \ DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ - DECLARE(UInt64, input_format_parquet_metadata_cache_max_entries, 10000, "Maximum number of parquet file metadata to cache", 0) \ DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 10000*50000, "Maximum size of parquet file metadata cache", 0) \ // clang-format on diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index ca0a75c067e9..50d4407b876f 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -61,7 +61,6 @@ namespace Setting namespace ServerSetting { - extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_entries; extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } @@ -500,12 +499,12 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } -ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes, UInt64 max_entries) - : CacheBase(max_size_bytes, max_entries) {} +ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes) + : CacheBase(max_size_bytes) {} -ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes, UInt64 max_entries) +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes) { - static ParquetFileMetaDataCache instance(max_size_bytes, max_entries); + static ParquetFileMetaDataCache instance(max_size_bytes); return &instance; } @@ -525,7 +524,7 @@ std::shared_ptr ParquetBlockInputFormat::getFileMetaData( return readMetadataFromFile(); } - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes, metadata_cache.max_entries)->getOrSet( + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet( metadata_cache.key, [&]() { @@ -713,7 +712,6 @@ void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & metadata_cache.key = key_; metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]; - metadata_cache.max_entries = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_entries]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 35ccc5765f23..974d0d1500ff 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -350,7 +350,6 @@ class ParquetBlockInputFormat : public IInputFormat { String key; bool use_cache = false; - UInt64 max_entries{0}; UInt64 max_size_bytes{0}; } metadata_cache; }; From c149d8e7d395065b28b8a18d7c518dba7641b69c Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 13 Jan 2025 08:57:08 -0300 Subject: [PATCH 6/7] :D --- src/Processors/Formats/Impl/ParquetBlockInputFormat.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 974d0d1500ff..d9a8d82e5cd6 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -373,11 +373,11 @@ class ParquetSchemaReader : public ISchemaReader class ParquetFileMetaDataCache : public CacheBase { public: - static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes, UInt64 max_entries); + static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes); void clear() {} private: - ParquetFileMetaDataCache(UInt64 max_size_bytes, UInt64 max_entries); + ParquetFileMetaDataCache(UInt64 max_size_bytes); }; } From 29ef51213b82a2ad4dcd46e8deb3182b0266a844 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Jan 2025 16:14:04 -0300 Subject: [PATCH 7/7] Update ServerSettings.cpp --- src/Core/ServerSettings.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index e4bb00a222f7..20eeaba86e46 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -213,7 +213,7 @@ namespace DB DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \ DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \ DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ - DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 10000*50000, "Maximum size of parquet file metadata cache", 0) \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \ // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below