-
Notifications
You must be signed in to change notification settings - Fork 8
Generalize engine definition for Iceberg tables #675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf5e8ac
0df7974
c619018
af6903b
2f4960c
ab3f58d
c6e3681
e2ad619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,12 @@ | |
#include <Common/KnownObjectNames.h> | ||
#include <Common/re2.h> | ||
#include <Common/maskURIPassword.h> | ||
#include <Common/NamedCollections/NamedCollections.h> | ||
#include <Common/NamedCollections/NamedCollectionsFactory.h> | ||
#include <Core/QualifiedTableName.h> | ||
#include <base/defines.h> | ||
#include <boost/algorithm/string/predicate.hpp> | ||
#include <Poco/String.h> | ||
|
||
|
||
namespace DB | ||
|
@@ -29,6 +32,21 @@ class AbstractFunction | |
virtual ~Arguments() = default; | ||
virtual size_t size() const = 0; | ||
virtual std::unique_ptr<Argument> at(size_t n) const = 0; | ||
void skipArgument(size_t n) { skipped_indexes.insert(n); } | ||
void unskipArguments() { skipped_indexes.clear(); } | ||
size_t getRealIndex(size_t n) const | ||
{ | ||
for (auto idx : skipped_indexes) | ||
{ | ||
if (n < idx) | ||
break; | ||
++n; | ||
} | ||
return n; | ||
} | ||
size_t skippedSize() const { return skipped_indexes.size(); } | ||
private: | ||
std::set<size_t> skipped_indexes; | ||
}; | ||
|
||
virtual ~AbstractFunction() = default; | ||
|
@@ -75,14 +93,15 @@ class FunctionSecretArgumentsFinder | |
{ | ||
if (index >= function->arguments->size()) | ||
return; | ||
auto real_index = function->arguments->getRealIndex(index); | ||
if (!result.count) | ||
{ | ||
result.start = index; | ||
result.start = real_index; | ||
result.are_named = argument_is_named; | ||
} | ||
chassert(index >= result.start); /// We always check arguments consecutively | ||
chassert(real_index >= result.start); /// We always check arguments consecutively | ||
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments | ||
result.count = index + 1 - result.start; | ||
result.count = real_index + 1 - result.start; | ||
if (!argument_is_named) | ||
result.are_named = false; | ||
} | ||
|
@@ -100,14 +119,18 @@ class FunctionSecretArgumentsFinder | |
{ | ||
findMongoDBSecretArguments(); | ||
} | ||
else if (function->name() == "iceberg") | ||
{ | ||
findIcebergFunctionSecretArguments(); | ||
} | ||
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") || | ||
(function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") || | ||
(function->name() == "deltaLake") || (function->name() == "hudi") || | ||
(function->name() == "gcs") || (function->name() == "icebergS3")) | ||
{ | ||
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) | ||
findS3FunctionSecretArguments(/* is_cluster_function= */ false); | ||
} | ||
else if (function->name() == "s3Cluster") | ||
else if ((function->name() == "s3Cluster") || (function->name() == "icebergS3Cluster")) | ||
{ | ||
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...) | ||
findS3FunctionSecretArguments(/* is_cluster_function= */ true); | ||
|
@@ -117,7 +140,7 @@ class FunctionSecretArgumentsFinder | |
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure) | ||
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ false); | ||
} | ||
else if (function->name() == "azureBlobStorageCluster") | ||
else if ((function->name() == "azureBlobStorageCluster") || (function->name() == "icebergAzureCluster")) | ||
{ | ||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) | ||
findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ true); | ||
|
@@ -218,11 +241,18 @@ class FunctionSecretArgumentsFinder | |
findSecretNamedArgument("secret_access_key", 1); | ||
return; | ||
} | ||
if (is_cluster_function && isNamedCollectionName(1)) | ||
{ | ||
/// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...) | ||
findSecretNamedArgument("secret_access_key", 2); | ||
return; | ||
} | ||
|
||
/// We should check other arguments first because we don't need to do any replacement in case of | ||
/// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) | ||
/// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) | ||
size_t count = excludeS3OrURLNestedMaps(); | ||
|
||
if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4)) | ||
{ | ||
String second_arg; | ||
|
@@ -287,6 +317,48 @@ class FunctionSecretArgumentsFinder | |
markSecretArgument(url_arg_idx + 4); | ||
} | ||
|
||
std::string findIcebergStorageType() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't you implement the same remove argument approach instead of doing the "skipIndices" thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
std::string storage_type = "s3"; | ||
|
||
size_t count = function->arguments->size(); | ||
if (!count) | ||
return storage_type; | ||
|
||
auto storage_type_idx = findNamedArgument(&storage_type, "storage_type"); | ||
if (storage_type_idx != -1) | ||
{ | ||
storage_type = Poco::toLower(storage_type); | ||
function->arguments->skipArgument(storage_type_idx); | ||
} | ||
else if (isNamedCollectionName(0)) | ||
{ | ||
std::string collection_name; | ||
if (function->arguments->at(0)->tryGetString(&collection_name, true)) | ||
{ | ||
NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case would If there is a chance this will fail, shouldn't you throw? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isNamedCollectionName actually only check that this argument is an identifier. This code is only for masking passwords and other sensitive info in logs etc., and getting storage_type from named collection here is only because S3 and Azure have passwords in different positions. I believe that if this code can't detect type correctly it should not break query execution more than it was broken before, because is used for example when ClickHouse writes query when it already catch another exception. |
||
if (collection && collection->has("storage_type")) | ||
{ | ||
storage_type = Poco::toLower(collection->get<std::string>("storage_type")); | ||
} | ||
} | ||
} | ||
|
||
return storage_type; | ||
} | ||
|
||
void findIcebergFunctionSecretArguments() | ||
{ | ||
auto storage_type = findIcebergStorageType(); | ||
|
||
if (storage_type == "s3") | ||
findS3FunctionSecretArguments(false); | ||
else if (storage_type == "azure") | ||
findAzureBlobStorageFunctionSecretArguments(false); | ||
|
||
function->arguments->unskipArguments(); | ||
} | ||
|
||
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0) | ||
{ | ||
String url_arg; | ||
|
@@ -310,7 +382,7 @@ class FunctionSecretArgumentsFinder | |
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1")) | ||
{ | ||
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments | ||
result.start = url_arg_idx; | ||
result.start = function->arguments->getRealIndex(url_arg_idx); | ||
result.are_named = argument_is_named; | ||
result.count = 1; | ||
result.replacement = url_arg; | ||
|
@@ -458,6 +530,7 @@ class FunctionSecretArgumentsFinder | |
void findTableEngineSecretArguments() | ||
{ | ||
const String & engine_name = function->name(); | ||
|
||
if (engine_name == "ExternalDistributed") | ||
{ | ||
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password') | ||
|
@@ -475,10 +548,13 @@ class FunctionSecretArgumentsFinder | |
{ | ||
findMongoDBSecretArguments(); | ||
} | ||
else if (engine_name == "Iceberg") | ||
{ | ||
findIcebergTableEngineSecretArguments(); | ||
} | ||
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS") | ||
|| (engine_name == "DeltaLake") || (engine_name == "Hudi") | ||
|| (engine_name == "Iceberg") || (engine_name == "IcebergS3") | ||
|| (engine_name == "S3Queue")) | ||
|| (engine_name == "IcebergS3") || (engine_name == "S3Queue")) | ||
{ | ||
/// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...) | ||
findS3TableEngineSecretArguments(); | ||
|
@@ -487,7 +563,7 @@ class FunctionSecretArgumentsFinder | |
{ | ||
findURLSecretArguments(); | ||
} | ||
else if (engine_name == "AzureBlobStorage") | ||
else if ((engine_name == "AzureBlobStorage") || (engine_name == "IcebergAzure")) | ||
{ | ||
findAzureBlobStorageTableEngineSecretArguments(); | ||
} | ||
|
@@ -579,6 +655,18 @@ class FunctionSecretArgumentsFinder | |
markSecretArgument(url_arg_idx + 4); | ||
} | ||
|
||
void findIcebergTableEngineSecretArguments() | ||
{ | ||
auto storage_type = findIcebergStorageType(); | ||
|
||
if (storage_type == "s3") | ||
findS3TableEngineSecretArguments(); | ||
else if (storage_type == "azure") | ||
findAzureBlobStorageTableEngineSecretArguments(); | ||
|
||
function->arguments->unskipArguments(); | ||
} | ||
|
||
void findDatabaseEngineSecretArguments() | ||
{ | ||
const String & engine_name = function->name(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of confusing. I get the idea, but not entirely and why it has to be done this way.
Why do you have to manually call
getRealIndex
in some places knowing you already implemented that in ARgumentsAST:Why unskip?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
markSecretArgument
andmaskAzureConnectionString
index goes outside in code, which works with raw index, not through this localArguments
class.Like here - https://github.com/ClickHouse/ClickHouse/blob/master/src/Analyzer/Resolve/QueryAnalyzer.cpp#L2932