Skip to content

Commit e3f0ac0

Browse files
committed
Merge pull request #883 from Altinity/backports/25.3/81451_iceberg_support_compressed_metadata
1 parent 8bcbf47 commit e3f0ac0

File tree

2 files changed

+94
-18
lines changed

2 files changed

+94
-18
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
1616
#include "Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h"
1717
#include <Interpreters/ExpressionActions.h>
18+
#include <IO/CompressedReadBufferWrapper.h>
1819

1920
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
2021
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
@@ -104,7 +105,8 @@ Poco::JSON::Object::Ptr getMetadataJSONObject(
104105
StorageObjectStorage::ConfigurationPtr configuration_ptr,
105106
IcebergMetadataFilesCachePtr cache_ptr,
106107
const ContextPtr & local_context,
107-
LoggerPtr log)
108+
LoggerPtr log,
109+
CompressionMethod compression_method)
108110
{
109111
auto create_fn = [&]()
110112
{
@@ -115,7 +117,14 @@ Poco::JSON::Object::Ptr getMetadataJSONObject(
115117
if (cache_ptr)
116118
read_settings.enable_filesystem_cache = false;
117119

118-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log, read_settings);
120+
auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
121+
122+
std::unique_ptr<ReadBuffer> buf;
123+
if (compression_method != CompressionMethod::None)
124+
buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method);
125+
else
126+
buf = std::move(source_buf);
127+
119128
String json_str;
120129
readJSONObjectPossiblyInvalid(json_str, *buf);
121130
return json_str;
@@ -274,7 +283,30 @@ Int32 IcebergMetadata::parseTableSchema(
274283
}
275284
}
276285

277-
static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & path)
286+
struct MetadataFileWithInfo
287+
{
288+
Int32 version;
289+
String path;
290+
CompressionMethod compression_method;
291+
};
292+
293+
static CompressionMethod getCompressionMethodFromMetadataFile(const String & path)
294+
{
295+
constexpr std::string_view metadata_suffix = ".metadata.json";
296+
297+
auto compression_method = chooseCompressionMethod(path, "auto");
298+
299+
/// NOTE you will be surprised, but some metadata files store compression not in the end of the file name,
300+
/// but somewhere in the middle of the file name, before metadata.json suffix.
301+
/// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises.
302+
/// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json
303+
if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix))
304+
compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto");
305+
306+
return compression_method;
307+
}
308+
309+
static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path)
278310
{
279311
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
280312
String version_str;
@@ -289,7 +321,10 @@ static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & pa
289321
throw Exception(
290322
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
291323

292-
return std::make_pair(std::stoi(version_str), path);
324+
return MetadataFileWithInfo{
325+
.version = std::stoi(version_str),
326+
.path = path,
327+
.compression_method = getCompressionMethodFromMetadataFile(path)};
293328
}
294329

295330
enum class MostRecentMetadataFileSelectionWay
@@ -300,7 +335,7 @@ enum class MostRecentMetadataFileSelectionWay
300335

301336
struct ShortMetadataFileInfo
302337
{
303-
UInt32 version;
338+
Int32 version;
304339
UInt64 last_updated_ms;
305340
String path;
306341
};
@@ -312,7 +347,7 @@ struct ShortMetadataFileInfo
312347
* 1) v<V>.metadata.json, where V - metadata version.
313348
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
314349
*/
315-
static std::pair<Int32, String> getLatestMetadataFileAndVersion(
350+
static MetadataFileWithInfo getLatestMetadataFileAndVersion(
316351
const ObjectStoragePtr & object_storage,
317352
StorageObjectStorage::ConfigurationPtr configuration_ptr,
318353
IcebergMetadataFilesCachePtr cache_ptr,
@@ -336,10 +371,10 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
336371
metadata_files_with_versions.reserve(metadata_files.size());
337372
for (const auto & path : metadata_files)
338373
{
339-
auto [version, metadata_file_path] = getMetadataFileAndVersion(path);
374+
auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path);
340375
if (need_all_metadata_files_parsing)
341376
{
342-
auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log);
377+
auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method);
343378
if (table_uuid.has_value())
344379
{
345380
if (metadata_file_object->has(f_table_uuid))
@@ -389,10 +424,11 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
389424
[](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; });
390425
}
391426
}();
392-
return {latest_metadata_file_info.version, latest_metadata_file_info.path};
427+
428+
return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)};
393429
}
394430

395-
static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(
431+
static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion(
396432
const ObjectStoragePtr & object_storage,
397433
StorageObjectStorage::ConfigurationPtr configuration_ptr,
398434
IcebergMetadataFilesCachePtr cache_ptr,
@@ -459,7 +495,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
459495

460496
std::lock_guard lock(mutex);
461497

462-
const auto [metadata_version, metadata_file_path]
498+
const auto [metadata_version, metadata_file_path, compression_method]
463499
= getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get());
464500

465501
bool metadata_file_changed = false;
@@ -469,7 +505,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
469505
metadata_file_changed = true;
470506
}
471507

472-
auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log);
508+
auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method);
473509
chassert(format_version == metadata_object->getValue<int>(f_format_version));
474510

475511
auto previous_snapshot_id = relevant_snapshot_id;
@@ -662,9 +698,9 @@ DataLakeMetadataPtr IcebergMetadata::create(
662698
else
663699
LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false.");
664700

665-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get());
701+
const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get());
666702

667-
Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log);
703+
Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method);
668704

669705
auto format_version = object->getValue<int>(f_format_version);
670706
return std::make_unique<IcebergMetadata>(object_storage, configuration_ptr, local_context, metadata_version, format_version, object, cache_ptr);
@@ -734,15 +770,15 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con
734770
{
735771
auto configuration_ptr = configuration.lock();
736772

737-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get());
773+
const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get());
738774

739775
chassert([&]()
740776
{
741777
SharedLockGuard lock(mutex);
742778
return metadata_version == last_metadata_version;
743779
}());
744780

745-
auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log);
781+
auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method);
746782
chassert([&]()
747783
{
748784
SharedLockGuard lock(mutex);

tests/integration/test_storage_iceberg/test.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import os
3+
import subprocess
34
import uuid
45
import time
56
from datetime import datetime, timezone
@@ -744,7 +745,7 @@ def make_query_from_function(
744745
# Cluster Query with node1 as coordinator and storage type as arg
745746
select_cluster_with_type_arg, query_id_cluster_with_type_arg = make_query_from_function(
746747
run_on_cluster=True,
747-
storage_type_as_arg=True,
748+
storage_type_as_arg=True,
748749
)
749750

750751
# Cluster Query with node1 as coordinator and storage type in named collection
@@ -2044,7 +2045,7 @@ def test_metadata_file_selection_from_version_hint(started_cluster, format_versi
20442045
spark.sql(
20452046
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)"
20462047
)
2047-
2048+
20482049
# test the case where version_hint.text file contains just the version number
20492050
with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "w") as f:
20502051
f.write('5')
@@ -3402,3 +3403,42 @@ def execute_spark_query(query: str):
34023403
table_select_expression = table_creation_expression
34033404

34043405
instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL")
3406+
3407+
3408+
@pytest.mark.parametrize("storage_type", ["local", "s3"])
3409+
def test_compressed_metadata(started_cluster, storage_type):
3410+
instance = started_cluster.instances["node1"]
3411+
spark = started_cluster.spark_session
3412+
TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str()
3413+
3414+
table_properties = {
3415+
"write.metadata.compression": "gzip"
3416+
}
3417+
3418+
df = spark.createDataFrame([
3419+
(1, "Alice"),
3420+
(2, "Bob")
3421+
], ["id", "name"])
3422+
3423+
# for some reason write.metadata.compression is not working :(
3424+
df.writeTo(TABLE_NAME) \
3425+
.tableProperty("write.metadata.compression", "gzip") \
3426+
.using("iceberg") \
3427+
.create()
3428+
3429+
# manual compression of metadata file before upload, still test some scenarios
3430+
subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True)
3431+
3432+
# Weird but compression extension is really in the middle of the file name, not in the end...
3433+
subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True)
3434+
3435+
default_upload_directory(
3436+
started_cluster,
3437+
storage_type,
3438+
f"/iceberg_data/default/{TABLE_NAME}/",
3439+
f"/iceberg_data/default/{TABLE_NAME}/",
3440+
)
3441+
3442+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="")
3443+
3444+
assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"

0 commit comments

Comments
 (0)