Skip to content

Commit 8bcbf47

Browse files
authored
Merge pull request #953 from Altinity/feature/antalya-25.6.5/frontport_860
25.6.5 Antalya port of #860: Support different warehouses behind Iceberg REST catalog
2 parents dfa56f4 + ed93cb0 commit 8bcbf47

File tree

10 files changed

+121
-19
lines changed

10 files changed

+121
-19
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_)
7070
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
7171

7272
if (pos_to_path == std::string::npos)
73-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
74-
75-
pos_to_path = pos_to_bucket + pos_to_path;
73+
{ // empty path
74+
location_without_path = location_;
75+
path.clear();
76+
bucket = location_.substr(pos_to_bucket);
77+
}
78+
else
79+
{
80+
pos_to_path = pos_to_bucket + pos_to_path;
7681

77-
location_without_path = location_.substr(0, pos_to_path);
78-
path = location_.substr(pos_to_path + 1);
79-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
82+
location_without_path = location_.substr(0, pos_to_path);
83+
path = location_.substr(pos_to_path + 1);
84+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
85+
}
8086

8187
LOG_TEST(getLogger("TableMetadata"),
8288
"Parsed location without path: {}, path: {}",

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
384384
auto bucket_uri = getURIForBucket(bucket);
385385
if (!bucket_uri)
386386
{
387-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
387+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
388388
return *maybe_error;
389389

390390
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -589,7 +589,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
589589
if (auto uri = getURIForBucket(bucket); uri.has_value())
590590
request.overrideURI(std::move(*uri));
591591

592-
593592
bool found_new_endpoint = false;
594593
// if we found correct endpoint after 301 responses, update the cache for future requests
595594
SCOPE_EXIT(
@@ -869,12 +868,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
869868
}
870869

871870
// Do a list request because head requests don't have body in response
872-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
871+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
872+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
873873
{
874-
ListObjectsV2Request req;
874+
GetObjectRequest req;
875875
req.SetBucket(bucket);
876-
req.SetMaxKeys(1);
877-
auto result = ListObjectsV2(req);
876+
req.SetKey(key);
877+
req.SetRange("bytes=0-1");
878+
auto result = GetObject(req);
879+
878880
if (result.IsSuccess())
879881
return std::nullopt;
880882
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ class Client : private Aws::S3::S3Client
279279

280280
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
281281
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
282-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
282+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
283283

284284
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
285285

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
158158
validateKey(key, uri);
159159
}
160160

161+
bool URI::isAWSRegion(std::string_view region)
162+
{
163+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
164+
static const std::unordered_set<std::string_view> regions = {
165+
"us-east-2",
166+
"us-east-1",
167+
"us-west-1",
168+
"us-west-2",
169+
"af-south-1",
170+
"ap-east-1",
171+
"ap-south-2",
172+
"ap-southeast-3",
173+
"ap-southeast-5",
174+
"ap-southeast-4",
175+
"ap-south-1",
176+
"ap-northeast-3",
177+
"ap-northeast-2",
178+
"ap-southeast-1",
179+
"ap-southeast-2",
180+
"ap-east-2",
181+
"ap-southeast-7",
182+
"ap-northeast-1",
183+
"ca-central-1",
184+
"ca-west-1",
185+
"eu-central-1",
186+
"eu-west-1",
187+
"eu-west-2",
188+
"eu-south-1",
189+
"eu-west-3",
190+
"eu-south-2",
191+
"eu-north-1",
192+
"eu-central-2",
193+
"il-central-1",
194+
"mx-central-1",
195+
"me-south-1",
196+
"me-central-1",
197+
"sa-east-1",
198+
"us-gov-east-1",
199+
"us-gov-west-1"
200+
};
201+
202+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
203+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
204+
if (region.substr(0, 3) == "s3-")
205+
region = region.substr(3);
206+
207+
return regions.contains(region);
208+
}
209+
161210
void URI::addRegionToURI(const std::string &region)
162211
{
163212
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
213+
{
214+
if (pos > 0)
215+
{ /// Check if region is already in endpoint to avoid add it second time
216+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
217+
if (prev_pos == std::string::npos)
218+
prev_pos = 0;
219+
else
220+
++prev_pos;
221+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
222+
if (isAWSRegion(endpoint_region))
223+
return;
224+
}
164225
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
226+
}
165227
}
166228

167229
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ struct URI
4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343
static void validateKey(const std::string & key, const Poco::URI & uri);
4444

45+
/// Returns true if 'region' string is an AWS S3 region
46+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
47+
static bool isAWSRegion(std::string_view region);
48+
4549
private:
4650
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
4751
};

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
528528

529529
relevant_snapshot = IcebergSnapshot{
530530
getManifestList(local_context, getProperFilePathFromMetadataInfo(
531-
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location)),
531+
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())),
532532
relevant_snapshot_id, total_rows, total_bytes};
533533

534534
if (!snapshot->has(f_schema_id))
@@ -710,7 +710,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context,
710710
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
711711
{
712712
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
713-
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location);
713+
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace());
714714
Int64 added_sequence_number = 0;
715715
if (format_version > 1)
716716
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
@@ -846,6 +846,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const
846846
schema_processor,
847847
inherited_sequence_number,
848848
table_location,
849+
configuration_ptr->getNamespace(),
849850
local_context);
850851
};
851852

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ ManifestFileContent::ManifestFileContent(
128128
const IcebergSchemaProcessor & schema_processor,
129129
Int64 inherited_sequence_number,
130130
const String & table_location,
131+
const String & common_namespace,
131132
DB::ContextPtr context)
132133
{
133134
this->schema_id = schema_id_;
@@ -192,7 +193,11 @@ ManifestFileContent::ManifestFileContent(
192193
}
193194
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet<UInt64>());
194195

195-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
196+
const auto file_path = getProperFilePathFromMetadataInfo(
197+
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
198+
common_path,
199+
table_location,
200+
common_namespace);
196201

197202
/// NOTE: This is weird, because in manifest file partition looks like this:
198203
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class ManifestFileContent
9494
const DB::IcebergSchemaProcessor & schema_processor,
9595
Int64 inherited_sequence_number,
9696
const std::string & table_location,
97+
const std::string & common_namespace,
9798
DB::ContextPtr context);
9899

99100
const std::vector<ManifestFileEntry> & getFiles() const;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ using namespace DB;
2828
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
2929
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
3030
// Common path should end with "<table_name>" or "<table_name>/".
31-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
31+
std::string getProperFilePathFromMetadataInfo(
32+
std::string_view data_path,
33+
std::string_view common_path,
34+
std::string_view table_location,
35+
std::string_view common_namespace)
3236
{
3337
auto trim_backward_slash = [](std::string_view str) -> std::string_view
3438
{
@@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
8488
}
8589
else
8690
{
87-
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
91+
/// Data files can have different path
92+
pos = data_path.find("://");
93+
if (pos == std::string::npos)
94+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
95+
pos = data_path.find('/', pos + 3);
96+
if (pos == std::string::npos)
97+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
98+
if (data_path.substr(pos + 1).starts_with(common_namespace))
99+
{
100+
auto new_pos = data_path.find('/', pos + 1);
101+
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
102+
pos = new_pos;
103+
}
104+
return std::string(data_path.substr(pos));
88105
}
89106
}
90107

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
namespace Iceberg
1111
{
1212

13-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
13+
std::string getProperFilePathFromMetadataInfo(
14+
std::string_view data_path,
15+
std::string_view common_path,
16+
std::string_view table_location,
17+
std::string_view common_namespace);
1418

1519
}
1620

0 commit comments

Comments
 (0)