diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index 8f611cd8fb21..a86279ec3c38 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -67,13 +67,19 @@ void TableMetadata::setLocation(const std::string & location_) auto pos_to_path = location_.substr(pos_to_bucket).find('/'); if (pos_to_path == std::string::npos) - throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); - - pos_to_path = pos_to_bucket + pos_to_path; + { // empty path + location_without_path = location_; + path.clear(); + bucket = location_.substr(pos_to_bucket); + } + else + { + pos_to_path = pos_to_bucket + pos_to_path; - location_without_path = location_.substr(0, pos_to_path); - path = location_.substr(pos_to_path + 1); - bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + } LOG_TEST(getLogger("TableMetadata"), "Parsed location without path: {}, path: {}", diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index b7bfef091148..445f5ab29194 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -373,7 +373,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const auto bucket_uri = getURIForBucket(bucket); if (!bucket_uri) { - if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value()) + if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value()) return *maybe_error; if (auto region = getRegionForBucket(bucket); !region.empty()) @@ -578,7 +578,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const if (auto uri = getURIForBucket(bucket); uri.has_value()) request.overrideURI(std::move(*uri)); - bool found_new_endpoint = false; // if we found correct endpoint after 301 responses, update the cache for future requests SCOPE_EXIT( @@ -813,12 +812,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c } // Do a list request because head requests don't have body in response -std::optional Client::updateURIForBucketForHead(const std::string & bucket) const +// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject +std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const { - ListObjectsV2Request req; + GetObjectRequest req; req.SetBucket(bucket); - req.SetMaxKeys(1); - auto result = ListObjectsV2(req); + req.SetKey(key); + req.SetRange("bytes=0-1"); + auto result = GetObject(req); + if (result.IsSuccess()) return std::nullopt; return result.GetError(); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 1cdc74b13973..025417013fcf 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const; std::optional getURIFromError(const Aws::S3::S3Error & error) const; - std::optional updateURIForBucketForHead(const std::string & bucket) const; + std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const; std::optional getURIForBucket(const std::string & bucket) const; diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index e2a10b787335..0792e79a25ff 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -157,10 +157,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) } } +bool URI::isAWSRegion(std::string_view region) +{ + /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html + static const std::unordered_set regions = { + "us-east-2", + "us-east-1", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-2", + "ap-southeast-3", + "ap-southeast-5", + "ap-southeast-4", + "ap-south-1", + "ap-northeast-3", + "ap-northeast-2", + "ap-southeast-1", + "ap-southeast-2", + "ap-east-2", + "ap-southeast-7", + "ap-northeast-1", + "ca-central-1", + "ca-west-1", + "eu-central-1", + "eu-west-1", + "eu-west-2", + "eu-south-1", + "eu-west-3", + "eu-south-2", + "eu-north-1", + "eu-central-2", + "il-central-1", + "mx-central-1", + "me-south-1", + "me-central-1", + "sa-east-1", + "us-gov-east-1", + "us-gov-west-1" + }; + + /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2' + /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility + if (region.substr(0, 3) == "s3-") + region = region.substr(3); + + return regions.contains(region); +} + void URI::addRegionToURI(const std::string ®ion) { if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos) + { + if (pos > 0) + { /// Check if region is already in endpoint to avoid add it second time + auto prev_pos = endpoint.find_last_of("/.", pos - 1); + if (prev_pos == std::string::npos) + prev_pos = 0; + else + ++prev_pos; + std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos); + if (isAWSRegion(endpoint_region)) + return; + } endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos); + } } void URI::validateBucket(const String & bucket, const Poco::URI & uri) diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index c8d0b28cd150..88a36874f4d9 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -41,6 +41,10 @@ struct URI static void validateBucket(const std::string & bucket, const Poco::URI & uri); + /// Returns true if 'region' string is an AWS S3 region + /// https://docs.aws.amazon.com/general/latest/gr/s3.html + static bool isAWSRegion(std::string_view region); + private: std::pair> getURIAndArchivePattern(const std::string & source); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 255c37dd0365..bcf3aef8b26c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -487,7 +487,7 @@ void IcebergMetadata::updateSnapshot() relevant_snapshot = IcebergSnapshot{ getManifestList(getProperFilePathFromMetadataInfo( - snapshot->getValue(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location)), + snapshot->getValue(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())), relevant_snapshot_id, total_rows, total_bytes}; if (!snapshot->has("schema-id")) @@ -654,7 +654,7 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i) { const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, MANIFEST_FILE_PATH_COLUMN, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location); + const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace()); Int64 added_sequence_number = 0; if (format_version > 1) added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, SEQUENCE_NUMBER_COLUMN, TypeIndex::Int64).safeGet(); @@ -706,6 +706,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64 schema_processor, inherited_sequence_number, table_location, + configuration_ptr->getNamespace(), context); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 3f934cd2bf5b..c6dfb389e498 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -141,6 +141,7 @@ ManifestFileContent::ManifestFileContent( const IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const String & table_location, + const String & common_namespace, DB::ContextPtr context) { this->schema_id = schema_id_; @@ -205,7 +206,11 @@ ManifestFileContent::ManifestFileContent( } const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, COLUMN_STATUS_NAME, TypeIndex::Int32).safeGet()); - const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet(), common_path, table_location); + const auto file_path = getProperFilePathFromMetadataInfo( + manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet(), + common_path, + table_location, + common_namespace); /// NOTE: This is weird, because in manifest file partition looks like this: /// { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index df86b747d26f..3261f36c39be 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -96,6 +96,7 @@ class ManifestFileContent const DB::IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const std::string & table_location, + const std::string & common_namespace, DB::ContextPtr context); const std::vector & getFiles() const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 56d7b9565fb1..df2a8007e74c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -28,7 +28,11 @@ using namespace DB; // This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. // For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro // Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location) +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace) { auto trim_backward_slash = [](std::string_view str) -> std::string_view { @@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s } else { - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path); + /// Data files can have different path + pos = data_path.find("://"); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + pos = data_path.find("/", pos + 3); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + if (data_path.substr(pos + 1).starts_with(common_namespace)) + { + auto new_pos = data_path.find("/", pos + 1); + if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path + pos = new_pos; + } + return std::string(data_path.substr(pos)); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 432751be8832..300df4492aa6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -10,7 +10,11 @@ namespace Iceberg { -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace); }