Skip to content

Commit a6ad598

Browse files
authored
Merge pull request #860 from Altinity/feature/different_folder_for_data_lake
Antalya 25.3: Support different warehouses behind Iceberg REST catalog
2 parents dc82819 + 95f4f0f commit a6ad598

File tree

10 files changed

+121
-19
lines changed

10 files changed

+121
-19
lines changed

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,19 @@ void TableMetadata::setLocation(const std::string & location_)
6767
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
6868

6969
if (pos_to_path == std::string::npos)
70-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
71-
72-
pos_to_path = pos_to_bucket + pos_to_path;
70+
{ // empty path
71+
location_without_path = location_;
72+
path.clear();
73+
bucket = location_.substr(pos_to_bucket);
74+
}
75+
else
76+
{
77+
pos_to_path = pos_to_bucket + pos_to_path;
7378

74-
location_without_path = location_.substr(0, pos_to_path);
75-
path = location_.substr(pos_to_path + 1);
76-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
79+
location_without_path = location_.substr(0, pos_to_path);
80+
path = location_.substr(pos_to_path + 1);
81+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
82+
}
7783

7884
LOG_TEST(getLogger("TableMetadata"),
7985
"Parsed location without path: {}, path: {}",

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
373373
auto bucket_uri = getURIForBucket(bucket);
374374
if (!bucket_uri)
375375
{
376-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
376+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
377377
return *maybe_error;
378378

379379
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -578,7 +578,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
578578
if (auto uri = getURIForBucket(bucket); uri.has_value())
579579
request.overrideURI(std::move(*uri));
580580

581-
582581
bool found_new_endpoint = false;
583582
// if we found correct endpoint after 301 responses, update the cache for future requests
584583
SCOPE_EXIT(
@@ -813,12 +812,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
813812
}
814813

815814
// Do a list request because head requests don't have body in response
816-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
815+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
816+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
817817
{
818-
ListObjectsV2Request req;
818+
GetObjectRequest req;
819819
req.SetBucket(bucket);
820-
req.SetMaxKeys(1);
821-
auto result = ListObjectsV2(req);
820+
req.SetKey(key);
821+
req.SetRange("bytes=0-1");
822+
auto result = GetObject(req);
823+
822824
if (result.IsSuccess())
823825
return std::nullopt;
824826
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client
269269

270270
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
271271
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
272-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
272+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
273273

274274
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
275275

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
157157
}
158158
}
159159

160+
bool URI::isAWSRegion(std::string_view region)
161+
{
162+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
163+
static const std::unordered_set<std::string_view> regions = {
164+
"us-east-2",
165+
"us-east-1",
166+
"us-west-1",
167+
"us-west-2",
168+
"af-south-1",
169+
"ap-east-1",
170+
"ap-south-2",
171+
"ap-southeast-3",
172+
"ap-southeast-5",
173+
"ap-southeast-4",
174+
"ap-south-1",
175+
"ap-northeast-3",
176+
"ap-northeast-2",
177+
"ap-southeast-1",
178+
"ap-southeast-2",
179+
"ap-east-2",
180+
"ap-southeast-7",
181+
"ap-northeast-1",
182+
"ca-central-1",
183+
"ca-west-1",
184+
"eu-central-1",
185+
"eu-west-1",
186+
"eu-west-2",
187+
"eu-south-1",
188+
"eu-west-3",
189+
"eu-south-2",
190+
"eu-north-1",
191+
"eu-central-2",
192+
"il-central-1",
193+
"mx-central-1",
194+
"me-south-1",
195+
"me-central-1",
196+
"sa-east-1",
197+
"us-gov-east-1",
198+
"us-gov-west-1"
199+
};
200+
201+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
202+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
203+
if (region.substr(0, 3) == "s3-")
204+
region = region.substr(3);
205+
206+
return regions.contains(region);
207+
}
208+
160209
void URI::addRegionToURI(const std::string &region)
161210
{
162211
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
212+
{
213+
if (pos > 0)
214+
{ /// Check if region is already in endpoint to avoid add it second time
215+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
216+
if (prev_pos == std::string::npos)
217+
prev_pos = 0;
218+
else
219+
++prev_pos;
220+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
221+
if (isAWSRegion(endpoint_region))
222+
return;
223+
}
163224
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
225+
}
164226
}
165227

166228
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ struct URI
4141

4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343

44+
/// Returns true if 'region' string is an AWS S3 region
45+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
46+
static bool isAWSRegion(std::string_view region);
47+
4448
private:
4549
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
4650
};

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ void IcebergMetadata::updateSnapshot()
487487

488488
relevant_snapshot = IcebergSnapshot{
489489
getManifestList(getProperFilePathFromMetadataInfo(
490-
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location)),
490+
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())),
491491
relevant_snapshot_id, total_rows, total_bytes};
492492

493493
if (!snapshot->has("schema-id"))
@@ -654,7 +654,7 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
654654
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
655655
{
656656
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, MANIFEST_FILE_PATH_COLUMN, TypeIndex::String).safeGet<std::string>();
657-
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location);
657+
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace());
658658
Int64 added_sequence_number = 0;
659659
if (format_version > 1)
660660
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, SEQUENCE_NUMBER_COLUMN, TypeIndex::Int64).safeGet<Int64>();
@@ -706,6 +706,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
706706
schema_processor,
707707
inherited_sequence_number,
708708
table_location,
709+
configuration_ptr->getNamespace(),
709710
context);
710711
};
711712

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ ManifestFileContent::ManifestFileContent(
141141
const IcebergSchemaProcessor & schema_processor,
142142
Int64 inherited_sequence_number,
143143
const String & table_location,
144+
const String & common_namespace,
144145
DB::ContextPtr context)
145146
{
146147
this->schema_id = schema_id_;
@@ -205,7 +206,11 @@ ManifestFileContent::ManifestFileContent(
205206
}
206207
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, COLUMN_STATUS_NAME, TypeIndex::Int32).safeGet<UInt64>());
207208

208-
const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet<String>(), common_path, table_location);
209+
const auto file_path = getProperFilePathFromMetadataInfo(
210+
manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet<String>(),
211+
common_path,
212+
table_location,
213+
common_namespace);
209214

210215
/// NOTE: This is weird, because in manifest file partition looks like this:
211216
/// {

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class ManifestFileContent
9696
const DB::IcebergSchemaProcessor & schema_processor,
9797
Int64 inherited_sequence_number,
9898
const std::string & table_location,
99+
const std::string & common_namespace,
99100
DB::ContextPtr context);
100101

101102
const std::vector<ManifestFileEntry> & getFiles() const;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ using namespace DB;
2828
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
2929
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
3030
// Common path should end with "<table_name>" or "<table_name>/".
31-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
31+
std::string getProperFilePathFromMetadataInfo(
32+
std::string_view data_path,
33+
std::string_view common_path,
34+
std::string_view table_location,
35+
std::string_view common_namespace)
3236
{
3337
auto trim_backward_slash = [](std::string_view str) -> std::string_view
3438
{
@@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
8488
}
8589
else
8690
{
87-
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
91+
/// Data files can have different path
92+
pos = data_path.find("://");
93+
if (pos == std::string::npos)
94+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
95+
pos = data_path.find("/", pos + 3);
96+
if (pos == std::string::npos)
97+
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
98+
if (data_path.substr(pos + 1).starts_with(common_namespace))
99+
{
100+
auto new_pos = data_path.find("/", pos + 1);
101+
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
102+
pos = new_pos;
103+
}
104+
return std::string(data_path.substr(pos));
88105
}
89106
}
90107

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
namespace Iceberg
1111
{
1212

13-
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
13+
std::string getProperFilePathFromMetadataInfo(
14+
std::string_view data_path,
15+
std::string_view common_path,
16+
std::string_view table_location,
17+
std::string_view common_namespace);
1418

1519
}
1620

0 commit comments

Comments
 (0)