diff --git a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h index 8bcb6e147420..e4f63192c95b 100644 --- a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h +++ b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h @@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction { public: explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {} - size_t size() const override { return arguments ? arguments->size() : 0; } - std::unique_ptr at(size_t n) const override { return std::make_unique(arguments->at(n).get()); } + size_t size() const override + { /// size withous skipped indexes + return arguments ? arguments->size() - skippedSize() : 0; + } + std::unique_ptr at(size_t n) const override + { /// n is relative index, some can be skipped + return std::make_unique(arguments->at(getRealIndex(n)).get()); + } private: const QueryTreeNodes * arguments = nullptr; }; diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 790c35a54efe..c42e10d9bd2d 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -230,7 +230,7 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ /// with_table_structure = false: because there will be /// no table structure in table definition AST. - StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, storage_settings.get()); + configuration->initialize(args, context_, /* with_table_structure */false, storage_settings.get()); auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value; diff --git a/src/Disks/DiskType.cpp b/src/Disks/DiskType.cpp index 07a7099419ba..ba767959e991 100644 --- a/src/Disks/DiskType.cpp +++ b/src/Disks/DiskType.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int UNKNOWN_ELEMENT_IN_CONFIG; } -MetadataStorageType metadataTypeFromString(const String & type) +MetadataStorageType metadataTypeFromString(const std::string & type) { auto check_type = Poco::toLower(type); if (check_type == "local") @@ -53,23 +53,47 @@ std::string DataSourceDescription::toString() const case DataSourceType::RAM: return "memory"; case DataSourceType::ObjectStorage: - { - switch (object_storage_type) - { - case ObjectStorageType::S3: - return "s3"; - case ObjectStorageType::HDFS: - return "hdfs"; - case ObjectStorageType::Azure: - return "azure_blob_storage"; - case ObjectStorageType::Local: - return "local_blob_storage"; - case ObjectStorageType::Web: - return "web"; - case ObjectStorageType::None: - return "none"; - } - } + return DB::toString(object_storage_type); } } + +ObjectStorageType objectStorageTypeFromString(const std::string & type) +{ + auto check_type = Poco::toLower(type); + if (check_type == "s3") + return ObjectStorageType::S3; + if (check_type == "hdfs") + return ObjectStorageType::HDFS; + if (check_type == "azure_blob_storage" || check_type == "azure") + return ObjectStorageType::Azure; + if (check_type == "local_blob_storage" || check_type == "local") + return ObjectStorageType::Local; + if (check_type == "web") + return ObjectStorageType::Web; + if (check_type == "none") + return ObjectStorageType::None; + + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, + "Unknown object storage type: {}", type); +} + +std::string toString(ObjectStorageType type) +{ + switch (type) + { + case ObjectStorageType::S3: + return "s3"; + case ObjectStorageType::HDFS: + return "hdfs"; + case ObjectStorageType::Azure: + return "azure_blob_storage"; + case ObjectStorageType::Local: + return "local_blob_storage"; + case ObjectStorageType::Web: + return "web"; + case ObjectStorageType::None: + return "none"; + } +} + } diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index bf7ef3d30eb0..1aa3ea19cbbe 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -34,8 +34,10 @@ enum class MetadataStorageType : uint8_t Memory, }; -MetadataStorageType metadataTypeFromString(const String & type); -String toString(DataSourceType data_source_type); +MetadataStorageType metadataTypeFromString(const std::string & type); + +ObjectStorageType objectStorageTypeFromString(const std::string & type); +std::string toString(ObjectStorageType type); struct DataSourceDescription { diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index c4f92236aa7e..440ea12e4bf2 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -3,9 +3,12 @@ #include #include #include +#include +#include #include #include #include +#include namespace DB @@ -29,6 +32,21 @@ class AbstractFunction virtual ~Arguments() = default; virtual size_t size() const = 0; virtual std::unique_ptr at(size_t n) const = 0; + void skipArgument(size_t n) { skipped_indexes.insert(n); } + void unskipArguments() { skipped_indexes.clear(); } + size_t getRealIndex(size_t n) const + { + for (auto idx : skipped_indexes) + { + if (n < idx) + break; + ++n; + } + return n; + } + size_t skippedSize() const { return skipped_indexes.size(); } + private: + std::set skipped_indexes; }; virtual ~AbstractFunction() = default; @@ -75,14 +93,15 @@ class FunctionSecretArgumentsFinder { if (index >= function->arguments->size()) return; + auto real_index = function->arguments->getRealIndex(index); if (!result.count) { - result.start = index; + result.start = real_index; result.are_named = argument_is_named; } - chassert(index >= result.start); /// We always check arguments consecutively + chassert(real_index >= result.start); /// We always check arguments consecutively chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments - result.count = index + 1 - result.start; + result.count = real_index + 1 - result.start; if (!argument_is_named) result.are_named = false; } @@ -100,14 +119,18 @@ class FunctionSecretArgumentsFinder { findMongoDBSecretArguments(); } + else if (function->name() == "iceberg") + { + findIcebergFunctionSecretArguments(); + } else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") || - (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") || + (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "gcs") || (function->name() == "icebergS3")) { /// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ false); } - else if (function->name() == "s3Cluster") + else if ((function->name() == "s3Cluster") || (function->name() == "icebergS3Cluster")) { /// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ true); @@ -117,7 +140,7 @@ class FunctionSecretArgumentsFinder /// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure) findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ false); } - else if (function->name() == "azureBlobStorageCluster") + else if ((function->name() == "azureBlobStorageCluster") || (function->name() == "icebergAzureCluster")) { /// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ true); @@ -218,11 +241,18 @@ class FunctionSecretArgumentsFinder findSecretNamedArgument("secret_access_key", 1); return; } + if (is_cluster_function && isNamedCollectionName(1)) + { + /// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...) + findSecretNamedArgument("secret_access_key", 2); + return; + } /// We should check other arguments first because we don't need to do any replacement in case of /// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) /// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) size_t count = excludeS3OrURLNestedMaps(); + if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4)) { String second_arg; @@ -287,6 +317,48 @@ class FunctionSecretArgumentsFinder markSecretArgument(url_arg_idx + 4); } + std::string findIcebergStorageType() + { + std::string storage_type = "s3"; + + size_t count = function->arguments->size(); + if (!count) + return storage_type; + + auto storage_type_idx = findNamedArgument(&storage_type, "storage_type"); + if (storage_type_idx != -1) + { + storage_type = Poco::toLower(storage_type); + function->arguments->skipArgument(storage_type_idx); + } + else if (isNamedCollectionName(0)) + { + std::string collection_name; + if (function->arguments->at(0)->tryGetString(&collection_name, true)) + { + NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name); + if (collection && collection->has("storage_type")) + { + storage_type = Poco::toLower(collection->get("storage_type")); + } + } + } + + return storage_type; + } + + void findIcebergFunctionSecretArguments() + { + auto storage_type = findIcebergStorageType(); + + if (storage_type == "s3") + findS3FunctionSecretArguments(false); + else if (storage_type == "azure") + findAzureBlobStorageFunctionSecretArguments(false); + + function->arguments->unskipArguments(); + } + bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0) { String url_arg; @@ -310,7 +382,7 @@ class FunctionSecretArgumentsFinder if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1")) { chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments - result.start = url_arg_idx; + result.start = function->arguments->getRealIndex(url_arg_idx); result.are_named = argument_is_named; result.count = 1; result.replacement = url_arg; @@ -458,6 +530,7 @@ class FunctionSecretArgumentsFinder void findTableEngineSecretArguments() { const String & engine_name = function->name(); + if (engine_name == "ExternalDistributed") { /// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password') @@ -475,10 +548,13 @@ class FunctionSecretArgumentsFinder { findMongoDBSecretArguments(); } + else if (engine_name == "Iceberg") + { + findIcebergTableEngineSecretArguments(); + } else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS") || (engine_name == "DeltaLake") || (engine_name == "Hudi") - || (engine_name == "Iceberg") || (engine_name == "IcebergS3") - || (engine_name == "S3Queue")) + || (engine_name == "IcebergS3") || (engine_name == "S3Queue")) { /// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...) findS3TableEngineSecretArguments(); @@ -487,7 +563,7 @@ class FunctionSecretArgumentsFinder { findURLSecretArguments(); } - else if (engine_name == "AzureBlobStorage") + else if ((engine_name == "AzureBlobStorage") || (engine_name == "IcebergAzure")) { findAzureBlobStorageTableEngineSecretArguments(); } @@ -579,6 +655,18 @@ class FunctionSecretArgumentsFinder markSecretArgument(url_arg_idx + 4); } + void findIcebergTableEngineSecretArguments() + { + auto storage_type = findIcebergStorageType(); + + if (storage_type == "s3") + findS3TableEngineSecretArguments(); + else if (storage_type == "azure") + findAzureBlobStorageTableEngineSecretArguments(); + + function->arguments->unskipArguments(); + } + void findDatabaseEngineSecretArguments() { const String & engine_name = function->name(); diff --git a/src/Parsers/FunctionSecretArgumentsFinderAST.h b/src/Parsers/FunctionSecretArgumentsFinderAST.h index a260c0d58da6..3624d7a7e87b 100644 --- a/src/Parsers/FunctionSecretArgumentsFinderAST.h +++ b/src/Parsers/FunctionSecretArgumentsFinderAST.h @@ -54,10 +54,13 @@ class FunctionAST : public AbstractFunction { public: explicit ArgumentsAST(const ASTs * arguments_) : arguments(arguments_) {} - size_t size() const override { return arguments ? arguments->size() : 0; } + size_t size() const override + { /// size withous skipped indexes + return arguments ? arguments->size() - skippedSize() : 0; + } std::unique_ptr at(size_t n) const override - { - return std::make_unique(arguments->at(n).get()); + { /// n is relative index, some can be skipped + return std::make_unique(arguments->at(getRealIndex(n)).get()); } private: const ASTs * arguments = nullptr; diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index de4f2d953aaa..2985be6c1fcb 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_set optional_configuration_keys = { "account_key", "connection_string", "storage_account_url", + "storage_type", }; void StorageAzureConfiguration::check(ContextPtr context) const diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 97cb3b5aa1ea..9e3a1b4566f2 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -12,7 +13,11 @@ #include #include #include -#include "Storages/ColumnsDescription.h" +#include +#include +#include +#include +#include #include #include @@ -27,6 +32,7 @@ namespace DB namespace ErrorCodes { extern const int FORMAT_VERSION_TOO_OLD; +extern const int LOGICAL_ERROR; } template @@ -251,15 +257,236 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl using StorageS3IcebergConfiguration = DataLakeConfiguration; #endif -#if USE_AZURE_BLOB_STORAGE +# if USE_AZURE_BLOB_STORAGE using StorageAzureIcebergConfiguration = DataLakeConfiguration; #endif -#if USE_HDFS +# if USE_HDFS using StorageHDFSIcebergConfiguration = DataLakeConfiguration; #endif using StorageLocalIcebergConfiguration = DataLakeConfiguration; + +/// Class detects storage type by `storage_type` parameter if exists +/// and uses appropriate implementation - S3, Azure, HDFS or Local +class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, public std::enable_shared_from_this +{ + friend class StorageObjectStorage::Configuration; + +public: + ObjectStorageType getType() const override { return getImpl().getType(); } + + std::string getTypeName() const override { return getImpl().getTypeName(); } + std::string getEngineName() const override { return getImpl().getEngineName(); } + std::string getNamespaceType() const override { return getImpl().getNamespaceType(); } + + Path getPath() const override { return getImpl().getPath(); } + void setPath(const Path & path) override { getImpl().setPath(path); } + + const Paths & getPaths() const override { return getImpl().getPaths(); } + void setPaths(const Paths & paths) override { getImpl().setPaths(paths); } + + String getDataSourceDescription() const override { return getImpl().getDataSourceDescription(); } + String getNamespace() const override { return getImpl().getNamespace(); } + + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr & context) const override + { return getImpl().getQuerySettings(context); } + + void addStructureAndFormatToArgsIfNeeded( + ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override + { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); } + + std::string getPathWithoutGlobs() const override { return getImpl().getPathWithoutGlobs(); } + + bool isArchive() const override { return getImpl().isArchive(); } + std::string getPathInArchive() const override { return getImpl().getPathInArchive(); } + + void check(ContextPtr context) const override { getImpl().check(context); } + void validateNamespace(const String & name) const override { getImpl().validateNamespace(name); } + + ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override + { return getImpl().createObjectStorage(context, is_readonly); } + StorageObjectStorage::ConfigurationPtr clone() override { return getImpl().clone(); } + bool isStaticConfiguration() const override { return getImpl().isStaticConfiguration(); } + + bool isDataLakeConfiguration() const override { return getImpl().isDataLakeConfiguration(); } + + bool hasExternalDynamicMetadata() override { return getImpl().hasExternalDynamicMetadata(); } + + std::shared_ptr getInitialSchemaByPath(const String & path) const override + { return getImpl().getInitialSchemaByPath(path); } + + std::shared_ptr getSchemaTransformer(const String & data_path) const override + { return getImpl().getSchemaTransformer(data_path); } + + ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr object_storage, ContextPtr context) override + { return getImpl().updateAndGetCurrentSchema(object_storage, context); } + + ReadFromFormatInfo prepareReadingFromFormat( + ObjectStoragePtr object_storage, + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context) override + { + return getImpl().prepareReadingFromFormat( + object_storage, + requested_columns, + storage_snapshot, + supports_subset_of_columns, + local_context); + } + + std::optional tryGetTableStructureFromMetadata() const override + { return getImpl().tryGetTableStructureFromMetadata(); } + + void update(ObjectStoragePtr object_storage, ContextPtr local_context) override + { return getImpl().update(object_storage, local_context); } + + void initialize( + ASTs & engine_args, + ContextPtr local_context, + bool with_table_structure, + StorageObjectStorageSettings * settings) override + { + createDynamicConfiguration(engine_args, local_context); + getImpl().initialize(engine_args, local_context, with_table_structure, settings); + } + + ASTPtr createArgsWithAccessData() const override + { + return getImpl().createArgsWithAccessData(); + } + +protected: + void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override + { return getImpl().fromNamedCollection(collection, context); } + void fromAST(ASTs & args, ContextPtr context, bool with_structure) override + { return getImpl().fromAST(args, context, with_structure); } + + /// Find storage_type argument and remove it from args if exists. + /// Return storage type. + ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg = nullptr) const override + { + static const auto storage_type_name = "storage_type"; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) + { + if (named_collection->has(storage_type_name)) + { + return objectStorageTypeFromString(named_collection->get(storage_type_name)); + } + } + + auto type_it = args.end(); + + /// S3 by default for backward compatibility + /// Iceberg without storage_type == IcebergS3 + ObjectStorageType type = ObjectStorageType::S3; + + for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) + { + const auto * type_ast_function = (*arg_it)->as(); + + if (type_ast_function && type_ast_function->name == "equals" + && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) + { + auto name = type_ast_function->arguments->children[0]->as(); + + if (name && name->name() == storage_type_name) + { + if (type_it != args.end()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake can have only one key-value argument: storage_type='type'."); + } + + auto value = type_ast_function->arguments->children[1]->as(); + + if (!value) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong type, string literal expected."); + } + + if (value->value.getType() != Field::Types::String) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong value type, string expected."); + } + + type = objectStorageTypeFromString(value->value.safeGet()); + + type_it = arg_it; + } + } + } + + if (type_it != args.end()) + { + if (type_arg) + *type_arg = *type_it; + args.erase(type_it); + } + + return type; + } + + void createDynamicConfiguration(ASTs & args, ContextPtr context) + { + ObjectStorageType type = extractDynamicStorageType(args, context); + createDynamicStorage(type); + } + +private: + inline StorageObjectStorage::Configuration & getImpl() const + { + if (!impl) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); + + return *impl; + } + + void createDynamicStorage(ObjectStorageType type) + { + if (impl) + { + if (impl->getType() == type) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); + } + + switch (type) + { +# if USE_AWS_S3 + case ObjectStorageType::S3: + impl = std::make_unique(); + break; +# endif +# if USE_AZURE_BLOB_STORAGE + case ObjectStorageType::Azure: + impl = std::make_unique(); + break; +# endif +# if USE_HDFS + case ObjectStorageType::HDFS: + impl = std::make_unique(); + break; +# endif + case ObjectStorageType::Local: + impl = std::make_unique(); + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); + } + } + + std::shared_ptr impl; +}; #endif #if USE_PARQUET && USE_AWS_S3 diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 099a96700f68..7b464cea93b2 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -74,7 +74,8 @@ static const std::unordered_set optional_configuration_keys = "max_single_part_upload_size", "max_connections", "expiration_window_seconds", - "no_sign_request" + "no_sign_request", + "storage_type", }; String StorageS3Configuration::getDataSourceDescription() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 33b847086793..97de56e5e2cf 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -591,42 +591,41 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c } void StorageObjectStorage::Configuration::initialize( - Configuration & configuration, ASTs & engine_args, ContextPtr local_context, bool with_table_structure, StorageObjectStorageSettings * settings) { if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) - configuration.fromNamedCollection(*named_collection, local_context); + fromNamedCollection(*named_collection, local_context); else - configuration.fromAST(engine_args, local_context, with_table_structure); + fromAST(engine_args, local_context, with_table_structure); - if (configuration.format == "auto") + if (format == "auto") { - if (configuration.isDataLakeConfiguration()) + if (isDataLakeConfiguration()) { - configuration.format = "Parquet"; + format = "Parquet"; } else { - configuration.format + format = FormatFactory::instance() - .tryGetFormatFromFileName(configuration.isArchive() ? configuration.getPathInArchive() : configuration.getPath()) + .tryGetFormatFromFileName(isArchive() ? getPathInArchive() : getPath()) .value_or("auto"); } } else - FormatFactory::instance().checkFormatName(configuration.format); + FormatFactory::instance().checkFormatName(format); if (settings) { - configuration.allow_dynamic_metadata_for_data_lakes + allow_dynamic_metadata_for_data_lakes = (*settings)[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes]; - configuration.allow_experimental_delta_kernel_rs + allow_experimental_delta_kernel_rs = (*settings)[StorageObjectStorageSetting::allow_experimental_delta_kernel_rs]; } - configuration.initialized = true; + initialized = true; } void StorageObjectStorage::Configuration::check(ContextPtr) const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index f63af6222462..ad98b8bfc97d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -167,8 +167,7 @@ class StorageObjectStorage::Configuration using Path = std::string; using Paths = std::vector; - static void initialize( - Configuration & configuration, + virtual void initialize( ASTs & engine_args, ContextPtr local_context, bool with_table_structure, @@ -260,10 +259,12 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName()); } -protected: virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0; virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0; + virtual ObjectStorageType extractDynamicStorageType(ASTs & /* args */, ContextPtr /* context */, ASTPtr * /* type_arg */ = nullptr) const + { return ObjectStorageType::None; } + void assertInitialized() const; bool initialized = false; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index affc68245ae4..4cc1a4d541ee 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -181,6 +181,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr {"S3", "s3"}, {"Azure", "azureBlobStorage"}, {"HDFS", "hdfs"}, + {"Iceberg", "iceberg"}, {"IcebergS3", "icebergS3"}, {"IcebergAzure", "icebergAzure"}, {"IcebergHDFS", "icebergHDFS"}, @@ -267,6 +268,8 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->getEngineName()); } + ASTPtr object_storage_type_arg; + configuration->extractDynamicStorageType(args, context, &object_storage_type_arg); if (cluster_name_in_settings) { configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); @@ -278,6 +281,8 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); args.insert(args.begin(), cluster_name_arg); } + if (object_storage_type_arg) + args.insert(args.end(), object_storage_type_arg); } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 12567d0c5893..7e3bca85ef2f 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -46,7 +46,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject auto cluster_name = (*queue_settings)[StorageObjectStorageSetting::object_storage_cluster].value; - StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, queue_settings.get()); + configuration->initialize(args.engine_args, context, false, queue_settings.get()); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current @@ -179,21 +179,21 @@ void registerStorageObjectStorage(StorageFactory & factory) void registerStorageIceberg(StorageFactory & factory) { -#if USE_AWS_S3 factory.registerStorage( "Iceberg", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + auto configuration = std::make_shared(); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, - .source_access_type = AccessType::S3, + .source_access_type = AccessType::NONE, .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, }); +# if USE_AWS_S3 factory.registerStorage( "IcebergS3", [&](const StorageFactory::Arguments & args) diff --git a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp index 664ee7ca6c80..61bdc4c076b0 100644 --- a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp +++ b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp @@ -32,7 +32,7 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); auto configuration = std::make_shared(); - StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false, nullptr); + configuration->initialize(args.engine_args, args.getContext(), false, nullptr); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 62d017c854d6..564f4fa34248 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -193,6 +193,10 @@ template class TableFunctionObjectStorage; +#if USE_AVRO +template class TableFunctionObjectStorage; +#endif + #if USE_AVRO && USE_AWS_S3 template class TableFunctionObjectStorage; #endif @@ -216,14 +220,6 @@ template class TableFunctionObjectStorage( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)", - .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -#endif factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored locally.)", diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index bdd82474c340..2a0e6ce35748 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -65,7 +65,7 @@ struct LocalDefinition struct IcebergDefinition { static constexpr auto name = "iceberg"; - static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_name = "UNDEFINED"; }; struct IcebergS3Definition @@ -130,7 +130,7 @@ class TableFunctionObjectStorage : public ITableFunction virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context) { - StorageObjectStorage::Configuration::initialize(*getConfiguration(), args, context, true, &settings); + getConfiguration()->initialize(args, context, true, &settings); } static void updateStructureAndFormatArgumentsIfNeeded( @@ -184,8 +184,9 @@ using TableFunctionLocal = TableFunctionObjectStorage; + # if USE_AWS_S3 -using TableFunctionIceberg = TableFunctionObjectStorage; using TableFunctionIcebergS3 = TableFunctionObjectStorage; # endif # if USE_AZURE_BLOB_STORAGE diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index a96dc6f089e0..cd6159bfdf31 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -116,32 +116,49 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) { UNUSED(factory); -#if USE_AWS_S3 + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on any object store in parallel for many nodes in a specified cluster.)", + .examples{ +# if USE_AWS_S3 + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression], storage_type='s3')", ""}, +# endif +# if USE_AZURE_BLOB_STORAGE + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression], storage_type='azure')", ""}, +# endif +# if USE_HDFS + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, uri, [format], [structure], [compression_method], storage_type='hdfs')", ""}, +# endif + }, + .category{""}}, + .allow_readonly = false}); + +# if USE_AWS_S3 factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif -#if USE_AZURE_BLOB_STORAGE +# if USE_AZURE_BLOB_STORAGE factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)", .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif -#if USE_HDFS +# if USE_HDFS factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)", .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index d30ea6aa1e03..a363e9a81c2d 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -31,6 +31,12 @@ struct HDFSClusterDefinition static constexpr auto storage_type_name = "HDFSCluster"; }; +struct IcebergClusterDefinition +{ + static constexpr auto name = "icebergCluster"; + static constexpr auto storage_type_name = "UNDEFINED"; +}; + struct IcebergS3ClusterDefinition { static constexpr auto name = "icebergS3Cluster"; @@ -108,6 +114,8 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif +using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster; + #if USE_AVRO && USE_AWS_S3 using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp index 6fe9f6853dce..f922781a8c8f 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -38,6 +38,13 @@ struct HDFSClusterFallbackDefinition static constexpr auto storage_type_cluster_name = "HDFSCluster"; }; +struct IcebergClusterFallbackDefinition +{ + static constexpr auto name = "iceberg"; + static constexpr auto storage_type_name = "UNDEFINED"; + static constexpr auto storage_type_cluster_name = "IcebergCluster"; +}; + struct IcebergS3ClusterFallbackDefinition { static constexpr auto name = "icebergS3"; @@ -129,6 +136,10 @@ using TableFunctionAzureClusterFallback = TableFunctionObjectStorageClusterFallb using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallback; #endif +#if USE_AVRO +using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + #if USE_AVRO && USE_AWS_S3 using TableFunctionIcebergS3ClusterFallback = TableFunctionObjectStorageClusterFallback; #endif @@ -213,6 +224,36 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa ); #endif +#if USE_AVRO + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on different object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='s3')", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='s3') " + "SETTINGS object_storage_cluster='cluster'", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='azure')", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, storage_type='hdfs') SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + #if USE_AVRO && USE_AWS_S3 factory.registerFunction( { diff --git a/tests/integration/test_mask_sensitive_info/test.py b/tests/integration/test_mask_sensitive_info/test.py index 9f7ea7b07182..20a4918b9941 100644 --- a/tests/integration/test_mask_sensitive_info/test.py +++ b/tests/integration/test_mask_sensitive_info/test.py @@ -273,11 +273,50 @@ def test_create_table(): f"IcebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", "DNS_ERROR", ), + ( + f"Iceberg(storage_type='s3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + "DNS_ERROR", + ), + f"AzureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", f"AzureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", - f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_2.csv', '{azure_account_name}', '{azure_account_key}')", - f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '{azure_account_key}', 'CSV')", - f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none')", + f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '{azure_account_key}')", + f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '{azure_account_key}', 'CSV')", + f"AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none')", + f"AzureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"AzureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + ( + f"IcebergAzure('{azure_conn_string}', 'cont', 'test_simple.csv')", + "FILE_DOESNT_EXIST", + ), + ( + f"IcebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", + "FILE_DOESNT_EXIST", + ), + ( + f"IcebergAzure(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + "FILE_DOESNT_EXIST", + ), + ( + f"IcebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + "FILE_DOESNT_EXIST", + ), + ( + f"Iceberg(storage_type='azure', '{azure_conn_string}', 'cont', 'test_simple.csv')", + "FILE_DOESNT_EXIST", + ), + ( + f"Iceberg(storage_type='azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", + "FILE_DOESNT_EXIST", + ), + ( + f"Iceberg(storage_type='azure', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + "FILE_DOESNT_EXIST", + ), + ( + f"Iceberg(storage_type='azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + "FILE_DOESNT_EXIST", + ), ] def make_test_case(i): @@ -344,11 +383,22 @@ def make_test_case(i): "CREATE TABLE table20 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV', 'gzip') SETTINGS mode = 'ordered'", "CREATE TABLE table21 (`x` int) ENGINE = Iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", "CREATE TABLE table22 (`x` int) ENGINE = IcebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - f"CREATE TABLE table23 (`x` int) ENGINE = AzureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", - f"CREATE TABLE table24 (`x` int) ENGINE = AzureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", - f"CREATE TABLE table25 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_2.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE table26 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", - f"CREATE TABLE table27 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", + "CREATE TABLE table23 (`x` int) ENGINE = Iceberg(storage_type = 's3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + f"CREATE TABLE table24 (`x` int) ENGINE = AzureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", + f"CREATE TABLE table25 (`x` int) ENGINE = AzureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", + f"CREATE TABLE table26 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE table27 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", + f"CREATE TABLE table28 (`x` int) ENGINE = AzureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", + f"CREATE TABLE table29 (`x` int) ENGINE = AzureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE table30 (`x` int) ENGINE = AzureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + f"CREATE TABLE table31 (`x` int) ENGINE = IcebergAzure('{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE table32 (`x` int) ENGINE = IcebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE table33 (`x` int) ENGINE = IcebergAzure(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE table34 (`x` int) ENGINE = IcebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + f"CREATE TABLE table35 (`x` int) ENGINE = Iceberg(storage_type = 'azure', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE table36 (`x` int) ENGINE = Iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE table37 (`x` int) ENGINE = Iceberg(storage_type = 'azure', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE table38 (`x` int) ENGINE = Iceberg(storage_type = 'azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", ], must_not_contain=[password], ) @@ -434,6 +484,9 @@ def test_table_functions(): f"s3('http://minio1:9001/root/data/test6.csv', 'minio', '{password}', 'CSV')", f"s3('http://minio1:9001/root/data/test7.csv', 'minio', '{password}', 'CSV', 'x int')", f"s3('http://minio1:9001/root/data/test8.csv.gz', 'minio', '{password}', 'CSV', 'x int', 'gzip')", + f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test1.csv', 'minio', '{password}')", + f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test2.csv', 'CSV', 'x int')", + f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test3.csv', 'minio', '{password}', 'CSV')", f"remote('127.{{2..11}}', default.remote_table)", f"remote('127.{{2..11}}', default.remote_table, rand())", f"remote('127.{{2..11}}', default.remote_table, 'remote_user')", @@ -462,10 +515,40 @@ def test_table_functions(): f"azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", f"azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", f"azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '{azure_account_key}')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '{azure_account_key}', 'CSV')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none')", + f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", + f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", - f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + f"iceberg(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", f"icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + f"icebergS3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", + f"icebergAzure('{azure_conn_string}', 'cont', 'test_simple.csv')", + f"icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", f"icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", + f"icebergAzure(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"icebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + f"icebergS3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + f"icebergS3Cluster('test_shard_localhost', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", + f"icebergAzureCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple.csv')", + f"icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", + f"icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", + f"icebergAzureCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"icebergAzureCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + f"iceberg(storage_type='s3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + f"iceberg(storage_type='s3', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", + f"iceberg(storage_type='azure', '{azure_conn_string}', 'cont', 'test_simple.csv')", + f"iceberg(storage_type='azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", + f"iceberg(storage_type='azure', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", + f"iceberg(storage_type='azure', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"iceberg(storage_type='azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", + f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", + ] def make_test_case(i): @@ -517,38 +600,70 @@ def make_test_case(i): "CREATE TABLE tablefunc8 (`x` int) AS s3('http://minio1:9001/root/data/test6.csv', 'minio', '[HIDDEN]', 'CSV')", "CREATE TABLE tablefunc9 (`x` int) AS s3('http://minio1:9001/root/data/test7.csv', 'minio', '[HIDDEN]', 'CSV', 'x int')", "CREATE TABLE tablefunc10 (`x` int) AS s3('http://minio1:9001/root/data/test8.csv.gz', 'minio', '[HIDDEN]', 'CSV', 'x int', 'gzip')", - "CREATE TABLE tablefunc11 (x int) AS remote('127.{2..11}', default.remote_table)", - "CREATE TABLE tablefunc12 (x int) AS remote('127.{2..11}', default.remote_table, rand())", - "CREATE TABLE tablefunc13 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user')", - "CREATE TABLE tablefunc14 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]')", - "CREATE TABLE tablefunc15 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user', rand())", - "CREATE TABLE tablefunc16 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc17 (`x` int) AS remote('127.{2..11}', 'default.remote_table', 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc18 (`x` int) AS remote('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc19 (`x` int) AS remote('127.{2..11}', numbers(10), 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc20 (`x` int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]')", - "CREATE TABLE tablefunc21 (x int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', rand())", - "CREATE TABLE tablefunc22 (`x` int) AS mysql(named_collection_1, host = 'mysql80', port = 3306, database = 'mysql_db', `table` = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')", - "CREATE TABLE tablefunc23 (`x` int) AS postgresql(named_collection_2, password = '[HIDDEN]', host = 'postgres1', port = 5432, database = 'postgres_db', `table` = 'postgres_table', user = 'postgres_user')", - "CREATE TABLE tablefunc24 (`x` int) AS s3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - "CREATE TABLE tablefunc25 (`x` int) AS remote(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]', sharding_key = rand())", - "CREATE TABLE tablefunc26 (`x` int) AS remoteSecure(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]')", - "CREATE TABLE tablefunc27 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')", - "CREATE TABLE tablefunc28 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc29 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - f"CREATE TABLE tablefunc30 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", - f"CREATE TABLE tablefunc31 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", - f"CREATE TABLE tablefunc32 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", - f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", - f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - "CREATE TABLE tablefunc39 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc40 (`x` int) AS gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc41 (`x` int) AS icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - f"CREATE TABLE tablefunc42 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + "CREATE TABLE tablefunc11 (`x` int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test1.csv', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc12 (x int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test2.csv', 'CSV', 'x int')", + "CREATE TABLE tablefunc13 (`x` int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test3.csv', 'minio', '[HIDDEN]', 'CSV')", + "CREATE TABLE tablefunc14 (x int) AS remote('127.{2..11}', default.remote_table)", + "CREATE TABLE tablefunc15 (x int) AS remote('127.{2..11}', default.remote_table, rand())", + "CREATE TABLE tablefunc16 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user')", + "CREATE TABLE tablefunc17 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]')", + "CREATE TABLE tablefunc18 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user', rand())", + "CREATE TABLE tablefunc19 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc20 (`x` int) AS remote('127.{2..11}', 'default.remote_table', 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc21 (`x` int) AS remote('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc22 (`x` int) AS remote('127.{2..11}', numbers(10), 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc23 (`x` int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]')", + "CREATE TABLE tablefunc24 (x int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', rand())", + "CREATE TABLE tablefunc25 (`x` int) AS mysql(named_collection_1, host = 'mysql80', port = 3306, database = 'mysql_db', `table` = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')", + "CREATE TABLE tablefunc26 (`x` int) AS postgresql(named_collection_2, password = '[HIDDEN]', host = 'postgres1', port = 5432, database = 'postgres_db', `table` = 'postgres_table', user = 'postgres_user')", + "CREATE TABLE tablefunc27 (`x` int) AS s3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + "CREATE TABLE tablefunc28 (`x` int) AS remote(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]', sharding_key = rand())", + "CREATE TABLE tablefunc29 (`x` int) AS remoteSecure(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]')", + "CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')", + "CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", + f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", + f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", + f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", + f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')", + f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')", + f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", + f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", + f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')", + f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc52 (`x` int) AS iceberg(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + "CREATE TABLE tablefunc53 (`x` int) AS icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc54 (`x` int) AS icebergS3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc55 (`x` int) AS icebergAzure('{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE tablefunc56 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc57 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc58 (`x` int) AS icebergAzure(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc59 (`x` int) AS icebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc60 (`x` int) AS icebergS3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc61 (`x` int) AS icebergS3Cluster('test_shard_localhost', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc62 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE tablefunc63 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc64 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc65 (`x` int) AS icebergAzureCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc66 (`x` int) AS icebergAzureCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc67 (`x` int) AS iceberg(storage_type = 's3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc68 (`x` int) AS iceberg(storage_type = 's3', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc69 (`x` int) AS iceberg(storage_type = 'azure', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE tablefunc70 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc71 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc72 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc73 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc74 (`x` int) AS gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", ], must_not_contain=[password], ) diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index b488638dd194..a80a3f55d06f 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -11,5 +11,22 @@ + + http://minio1:9001/root/ + minio + minio123 + s3 + + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + azure + + + hdfs + + + local + diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index e533f3c50d64..c755f05dfdcf 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1,21 +1,15 @@ -import glob -import json import logging import os -import time import uuid from datetime import datetime import pyspark import pytest -from azure.storage.blob import BlobServiceClient from minio.deleteobjects import DeleteObject from pyspark.sql.functions import ( - current_timestamp, monotonically_increasing_id, row_number, ) -from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 from pyspark.sql.types import ( ArrayType, BooleanType, @@ -24,17 +18,14 @@ StringType, StructField, StructType, - TimestampType, ) from pyspark.sql.window import Window -import helpers.client from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.s3_tools import ( AzureUploader, LocalUploader, S3Uploader, - get_file_contents, list_s3_objects, prepare_s3_bucket, ) @@ -202,6 +193,8 @@ def get_creation_expression( allow_dynamic_metadata_for_data_lakes=False, run_on_cluster=False, object_storage_cluster=False, + storage_type_as_arg=False, + storage_type_in_named_collection=False, **kwargs, ): settings_suffix = "" @@ -210,9 +203,25 @@ def get_creation_expression( if allow_dynamic_metadata_for_data_lakes: settings.append("allow_dynamic_metadata_for_data_lakes = 1") if object_storage_cluster: - settings.append(f"object_storage_cluster = '{object_storage_cluster}'") + settings.append(f"object_storage_cluster = 'cluster_simple'") settings_suffix = " SETTINGS " + ", ".join(settings) + storage_arg = storage_type + engine_part = "" + if (storage_type_in_named_collection): + storage_arg += "_with_type" + elif (storage_type_as_arg): + storage_arg += f", storage_type='{storage_type}'" + else: + if (storage_type == "s3"): + engine_part = "S3" + elif (storage_type == "azure"): + engine_part = "Azure" + elif (storage_type == "hdfs"): + engine_part = "HDFS" + elif (storage_type == "local"): + engine_part = "Local" + if storage_type == "s3": if "bucket" in kwargs: bucket = kwargs["bucket"] @@ -221,16 +230,16 @@ def get_creation_expression( if run_on_cluster: assert table_function - return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + return f"iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" else: if table_function: - return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + return f"iceberg{engine_part}({storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" else: return ( f""" DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} - ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + ENGINE=Iceberg{engine_part}({storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + settings_suffix ) @@ -238,19 +247,19 @@ def get_creation_expression( if run_on_cluster: assert table_function return f""" - icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) """ else: if table_function: return f""" - icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + iceberg{engine_part}({storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) """ else: return ( f""" DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} - ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + ENGINE=Iceberg{engine_part}({storage_arg}, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + settings_suffix ) @@ -259,14 +268,14 @@ def get_creation_expression( if table_function: return f""" - icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) + iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) """ else: return ( f""" DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} - ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})""" + ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})""" + settings_suffix ) @@ -306,6 +315,7 @@ def create_iceberg_table( object_storage_cluster=False, **kwargs, ): + node.query(f"DROP TABLE IF EXISTS {table_name} SYNC") node.query( get_creation_expression( storage_type, @@ -547,6 +557,52 @@ def test_types(started_cluster, format_version, storage_type): ] ) + # Test storage type as function argument + table_function_expr = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + storage_type_as_arg=True, + ) + assert ( + instance.query(f"SELECT a, b, c, d, e FROM {table_function_expr}").strip() + == "123\tstring\t2000-01-01\t['str1','str2']\ttrue" + ) + + assert instance.query(f"DESCRIBE {table_function_expr} FORMAT TSV") == TSV( + [ + ["a", "Nullable(Int32)"], + ["b", "Nullable(String)"], + ["c", "Nullable(Date32)"], + ["d", "Array(Nullable(String))"], + ["e", "Nullable(Bool)"], + ] + ) + + # Test storage type as field in named collection + table_function_expr = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + storage_type_in_named_collection=True, + ) + assert ( + instance.query(f"SELECT a, b, c, d, e FROM {table_function_expr}").strip() + == "123\tstring\t2000-01-01\t['str1','str2']\ttrue" + ) + + assert instance.query(f"DESCRIBE {table_function_expr} FORMAT TSV") == TSV( + [ + ["a", "Nullable(Int32)"], + ["b", "Nullable(String)"], + ["c", "Nullable(Date32)"], + ["d", "Array(Nullable(String))"], + ["e", "Nullable(Bool)"], + ] + ) + def count_secondary_subqueries(started_cluster, query_id, expected, comment): for node_name, replica in started_cluster.instances.items(): @@ -623,81 +679,92 @@ def add_df(mode): instance.query(f"SELECT * FROM {table_function_expr}").strip().split() ) + def make_query_from_function( + run_on_cluster=False, + alt_syntax=False, + remote=False, + storage_type_as_arg=False, + storage_type_in_named_collection=False, + ): + expr = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + run_on_cluster=run_on_cluster, + storage_type_as_arg=storage_type_as_arg, + storage_type_in_named_collection=storage_type_in_named_collection, + ) + query_id = str(uuid.uuid4()) + settings = "SETTINGS object_storage_cluster='cluster_simple'" if alt_syntax else "" + if remote: + query = f"SELECT * FROM remote('node2', {expr}) {settings}" + else: + query = f"SELECT * FROM {expr} {settings}" + responce = instance.query(query, query_id=query_id).strip().split() + return responce, query_id + # Cluster Query with node1 as coordinator - table_function_expr_cluster = get_creation_expression( - storage_type, - TABLE_NAME, - started_cluster, - table_function=True, - run_on_cluster=True, - ) - query_id_cluster = str(uuid.uuid4()) - select_cluster = ( - instance.query( - f"SELECT * FROM {table_function_expr_cluster}", query_id=query_id_cluster - ) - .strip() - .split() - ) + select_cluster, query_id_cluster = make_query_from_function(run_on_cluster=True) # Cluster Query with node1 as coordinator with alternative syntax - query_id_cluster_alt_syntax = str(uuid.uuid4()) - select_cluster_alt_syntax = ( - instance.query( - f""" - SELECT * FROM {table_function_expr} - SETTINGS object_storage_cluster='cluster_simple' - """, - query_id=query_id_cluster_alt_syntax, - ) - .strip() - .split() + select_cluster_alt_syntax, query_id_cluster_alt_syntax = make_query_from_function( + run_on_cluster=True, + alt_syntax=True) + + # Cluster Query with node1 as coordinator and storage type as arg + select_cluster_with_type_arg, query_id_cluster_with_type_arg = make_query_from_function( + run_on_cluster=True, + storage_type_as_arg=True, ) - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster='cluster_simple') - query_id_cluster_table_engine = str(uuid.uuid4()) - select_cluster_table_engine = ( - instance.query( - f""" - SELECT * FROM {TABLE_NAME} - """, - query_id=query_id_cluster_table_engine, - ) - .strip() - .split() + # Cluster Query with node1 as coordinator and storage type in named collection + select_cluster_with_type_in_nc, query_id_cluster_with_type_in_nc = make_query_from_function( + run_on_cluster=True, + storage_type_in_named_collection=True, ) - select_remote_cluster = ( - instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") - .strip() - .split() + # Cluster Query with node1 as coordinator and storage type as arg, alternative syntax + select_cluster_with_type_arg_alt_syntax, query_id_cluster_with_type_arg_alt_syntax = make_query_from_function( + storage_type_as_arg=True, + alt_syntax=True, ) - instance.query(f"DROP TABLE IF EXISTS `{TABLE_NAME}` SYNC") - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) - query_id_pure_table_engine = str(uuid.uuid4()) - select_pure_table_engine = ( - instance.query( - f""" - SELECT * FROM {TABLE_NAME} - """, - query_id=query_id_pure_table_engine, - ) - .strip() - .split() + # Cluster Query with node1 as coordinator and storage type in named collection, alternative syntax + select_cluster_with_type_in_nc_alt_syntax, query_id_cluster_with_type_in_nc_alt_syntax = make_query_from_function( + storage_type_in_named_collection=True, + alt_syntax=True, ) - query_id_pure_table_engine_cluster = str(uuid.uuid4()) - select_pure_table_engine_cluster = ( - instance.query( - f""" - SELECT * FROM {TABLE_NAME} - SETTINGS object_storage_cluster='cluster_simple' - """, - query_id=query_id_pure_table_engine_cluster, + + select_remote_cluster, _ = make_query_from_function(run_on_cluster=True, remote=True) + + def make_query_from_table(alt_syntax=False): + query_id = str(uuid.uuid4()) + settings = "SETTINGS object_storage_cluster='cluster_simple'" if alt_syntax else "" + responce = ( + instance.query( + f"SELECT * FROM {TABLE_NAME} {settings}", + query_id=query_id, + ) + .strip() + .split() ) - .strip() - .split() - ) + return responce, query_id + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster=True) + select_cluster_table_engine, query_id_cluster_table_engine = make_query_from_table() + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + select_pure_table_engine, query_id_pure_table_engine = make_query_from_table() + select_pure_table_engine_cluster, query_id_pure_table_engine_cluster = make_query_from_table(alt_syntax=True) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, storage_type_as_arg=True) + select_pure_table_engine_with_type_arg, query_id_pure_table_engine_with_type_arg = make_query_from_table() + select_pure_table_engine_cluster_with_type_arg, query_id_pure_table_engine_cluster_with_type_arg = make_query_from_table(alt_syntax=True) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, storage_type_in_named_collection=True) + select_pure_table_engine_with_type_in_nc, query_id_pure_table_engine_with_type_in_nc = make_query_from_table() + select_pure_table_engine_cluster_with_type_in_nc, query_id_pure_table_engine_cluster_with_type_in_nc = make_query_from_table(alt_syntax=True) # Simple size check assert len(select_regular) == 600 @@ -705,16 +772,32 @@ def add_df(mode): assert len(select_cluster_alt_syntax) == 600 assert len(select_cluster_table_engine) == 600 assert len(select_remote_cluster) == 600 + assert len(select_cluster_with_type_arg) == 600 + assert len(select_cluster_with_type_in_nc) == 600 + assert len(select_cluster_with_type_arg_alt_syntax) == 600 + assert len(select_cluster_with_type_in_nc_alt_syntax) == 600 assert len(select_pure_table_engine) == 600 assert len(select_pure_table_engine_cluster) == 600 + assert len(select_pure_table_engine_with_type_arg) == 600 + assert len(select_pure_table_engine_cluster_with_type_arg) == 600 + assert len(select_pure_table_engine_with_type_in_nc) == 600 + assert len(select_pure_table_engine_cluster_with_type_in_nc) == 600 # Actual check assert select_cluster == select_regular assert select_cluster_alt_syntax == select_regular assert select_cluster_table_engine == select_regular assert select_remote_cluster == select_regular + assert select_cluster_with_type_arg == select_regular + assert select_cluster_with_type_in_nc == select_regular + assert select_cluster_with_type_arg_alt_syntax == select_regular + assert select_cluster_with_type_in_nc_alt_syntax == select_regular assert select_pure_table_engine == select_regular assert select_pure_table_engine_cluster == select_regular + assert select_pure_table_engine_with_type_arg == select_regular + assert select_pure_table_engine_cluster_with_type_arg == select_regular + assert select_pure_table_engine_with_type_in_nc == select_regular + assert select_pure_table_engine_cluster_with_type_in_nc == select_regular # Check query_log for replica in started_cluster.instances.values(): @@ -723,8 +806,16 @@ def add_df(mode): count_secondary_subqueries(started_cluster, query_id_cluster, 1, "table function") count_secondary_subqueries(started_cluster, query_id_cluster_alt_syntax, 1, "table function alt syntax") count_secondary_subqueries(started_cluster, query_id_cluster_table_engine, 1, "cluster table engine") + count_secondary_subqueries(started_cluster, query_id_cluster_with_type_arg, 1, "table function with storage type in args") + count_secondary_subqueries(started_cluster, query_id_cluster_with_type_in_nc, 1, "table function with storage type in named collection") + count_secondary_subqueries(started_cluster, query_id_cluster_with_type_arg_alt_syntax, 1, "table function with storage type in args alt syntax") + count_secondary_subqueries(started_cluster, query_id_cluster_with_type_in_nc_alt_syntax, 1, "table function with storage type in named collection alt syntax") count_secondary_subqueries(started_cluster, query_id_pure_table_engine, 0, "table engine") count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster, 1, "table engine with cluster setting") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_with_type_arg, 0, "table engine with storage type in args") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster_with_type_arg, 1, "table engine with cluster setting with storage type in args") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_with_type_in_nc, 0, "table engine with storage type in named collection") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster_with_type_in_nc, 1, "table engine with cluster setting with storage type in named collection") @pytest.mark.parametrize("format_version", ["1", "2"])