Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/Storages/ObjectStorage/Azure/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
if (collection.has("account_key"))
account_key = collection.get<String>("account_key");

structure = collection.getOrDefault<String>("structure", "auto");
format = collection.getOrDefault<String>("format", format);
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
setStructure(collection.getOrDefault<String>("structure", "auto"));
setFormat(collection.getOrDefault<String>("format", getFormat()));
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));

blobs_paths = {blob_path};
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
Expand Down Expand Up @@ -187,12 +187,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
format = fourth_arg;
setFormat(fourth_arg);
}
else
{
if (with_structure)
structure = fourth_arg;
setStructure(fourth_arg);
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -204,8 +204,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
format = fourth_arg;
compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
setFormat(fourth_arg);
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[4], "compression"));
}
else
{
Expand All @@ -220,9 +220,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
{
if (with_structure)
{
format = fourth_arg;
compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
structure = checkAndGetLiteralArgument<String>(engine_args[5], "structure");
setFormat(fourth_arg);
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[4], "compression"));
setStructure(checkAndGetLiteralArgument<String>(engine_args[5], "structure"));
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments");
Expand All @@ -234,12 +234,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
if (is_format_arg(sixth_arg))
{
format = sixth_arg;
setFormat(sixth_arg);
}
else
{
if (with_structure)
structure = sixth_arg;
setStructure(sixth_arg);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
}
Expand All @@ -258,8 +258,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
format = sixth_arg;
compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
setFormat(sixth_arg);
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[6], "compression"));
}
else if (with_structure && engine_args.size() == 8)
{
Expand All @@ -269,9 +269,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
format = sixth_arg;
compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
structure = checkAndGetLiteralArgument<String>(engine_args[7], "structure");
setFormat(sixth_arg);
setCompressionMethod (checkAndGetLiteralArgument<String>(engine_args[6], "compression"));
setStructure(checkAndGetLiteralArgument<String>(engine_args[7], "structure"));
}

blobs_paths = {blob_path};
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
return getImpl().createArgsWithAccessData();
}

const String & getFormat() const override { return getImpl().getFormat(); }
const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); }
const String & getStructure() const override { return getImpl().getStructure(); }

void setFormat(const String & format_) override { getImpl().setFormat(format_); }
void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); }
void setStructure(const String & structure_) override { getImpl().setStructure(structure_); }

const StorageObjectStorageSettings & getSettingsRef() const override
{
return getImpl().getSettingsRef();
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Strings HudiMetadata::getDataFilesImpl() const
{
auto configuration_ptr = configuration.lock();
auto log = getLogger("HudiMetadata");
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format));
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->getFormat()));

using Partition = std::string;
using FileID = std::string;
Expand Down
16 changes: 8 additions & 8 deletions src/Storages/ObjectStorage/HDFS/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,23 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit

if (args.size() > 1)
{
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
setFormat(checkAndGetLiteralArgument<String>(args[1], "format_name"));
}

if (with_structure)
{
if (args.size() > 2)
{
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
setStructure(checkAndGetLiteralArgument<String>(args[2], "structure"));
}
if (args.size() > 3)
{
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
setCompressionMethod(checkAndGetLiteralArgument<String>(args[3], "compression_method"));
}
}
else if (args.size() > 2)
{
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
setCompressionMethod(checkAndGetLiteralArgument<String>(args[2], "compression_method"));
}

setURL(url_str);
Expand All @@ -143,10 +143,10 @@ void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & colle
else
url_str = collection.get<String>("url");

format = collection.getOrDefault<String>("format", "auto");
compression_method = collection.getOrDefault<String>("compression_method",
collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
setFormat(collection.getOrDefault<String>("format", "auto"));
setCompressionMethod(collection.getOrDefault<String>("compression_method",
collection.getOrDefault<String>("compression", "auto")));
setStructure(collection.getOrDefault<String>("structure", "auto"));

setURL(url_str);
}
Expand Down
14 changes: 7 additions & 7 deletions src/Storages/ObjectStorage/Local/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
{
path = collection.get<String>("path");
format = collection.getOrDefault<String>("format", "auto");
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
setFormat(collection.getOrDefault<String>("format", "auto"));
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));
setStructure(collection.getOrDefault<String>("structure", "auto"));
paths = {path};
}

Expand All @@ -46,23 +46,23 @@ void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool wi

if (args.size() > 1)
{
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
setFormat(checkAndGetLiteralArgument<String>(args[1], "format_name"));
}

if (with_structure)
{
if (args.size() > 2)
{
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
setStructure(checkAndGetLiteralArgument<String>(args[2], "structure"));
}
if (args.size() > 3)
{
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
setCompressionMethod(checkAndGetLiteralArgument<String>(args[3], "compression_method"));
}
}
else if (args.size() > 2)
{
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
setCompressionMethod(checkAndGetLiteralArgument<String>(args[2], "compression_method"));
}
paths = {path};
}
Expand Down
10 changes: 5 additions & 5 deletions src/Storages/ObjectStorage/ReadBufferIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ ReadBufferIterator::ReadBufferIterator(
, read_keys(read_keys_)
, prev_read_keys_size(read_keys_.size())
{
if (configuration->format != "auto")
format = configuration->format;
if (configuration->getFormat() != "auto")
format = configuration->getFormat();
}

SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const
Expand Down Expand Up @@ -148,7 +148,7 @@ std::unique_ptr<ReadBuffer> ReadBufferIterator::recreateLastReadBuffer()

auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator"));

const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method);
const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->getCompressionMethod());
const auto zstd_window = static_cast<int>(context->getSettingsRef()[Setting::zstd_window_log_max]);

return wrapReadBufferWithCompressionMethod(std::move(impl), compression_method, zstd_window);
Expand Down Expand Up @@ -266,13 +266,13 @@ ReadBufferIterator::Data ReadBufferIterator::next()
using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive;
if (const auto * object_info_in_archive = dynamic_cast<const ObjectInfoInArchive *>(current_object_info.get()))
{
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod());
const auto & archive_reader = object_info_in_archive->archive_reader;
read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true);
}
else
{
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod());
read_buf = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, getContext(), getLogger("ReadBufferIterator"));
}

Expand Down
20 changes: 10 additions & 10 deletions src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
auth_settings[S3AuthSetting::expiration_window_seconds] = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
auth_settings[S3AuthSetting::session_token] = collection.getOrDefault<String>("session_token", "");

format = collection.getOrDefault<String>("format", format);
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
setFormat(collection.getOrDefault<String>("format", getFormat()));
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));
setStructure(collection.getOrDefault<String>("structure", "auto"));

request_settings = S3::S3RequestSettings(collection, settings, /* validate_settings */true);

Expand Down Expand Up @@ -433,14 +433,14 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format_ != "auto")
format = format_;
setFormat(format_);
}

if (engine_args_to_idx.contains("structure"))
structure = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["structure"]], "structure");
setStructure(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["structure"]], "structure"));

if (engine_args_to_idx.contains("compression_method"))
compression_method = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method");
setCompressionMethod(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method"));

if (engine_args_to_idx.contains("access_key_id"))
auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["access_key_id"]], "access_key_id");
Expand Down Expand Up @@ -683,10 +683,10 @@ ASTPtr StorageS3Configuration::createArgsWithAccessData() const
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::secret_access_key].value));
if (!auth_settings[S3AuthSetting::session_token].value.empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::session_token].value));
if (format != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(format));
if (!compression_method.empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(compression_method));
if (getFormat() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
if (!getCompressionMethod().empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));

if (!auth_settings[S3AuthSetting::role_arn].value.empty())
{
Expand Down
20 changes: 10 additions & 10 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ StorageObjectStorage::StorageObjectStorage(
, distributed_processing(distributed_processing_)
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->format.empty();
bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty();
update_configuration_on_read = !is_table_function_ || do_lazy_init;
bool failed_init = false;
auto do_init = [&]()
Expand All @@ -115,7 +115,7 @@ StorageObjectStorage::StorageObjectStorage(
{
// If we don't have format or schema yet, we can't ignore failed configuration update,
// because relevant configuration is crucial for format and schema inference
if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->format == "auto"))
if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto"))
{
throw;
}
Expand All @@ -132,7 +132,7 @@ StorageObjectStorage::StorageObjectStorage(

std::string sample_path;
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);

StorageInMemoryMetadata metadata;
Expand Down Expand Up @@ -172,17 +172,17 @@ String StorageObjectStorage::getName() const

bool StorageObjectStorage::prefersLargeBlocks() const
{
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format);
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->getFormat());
}

bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context);
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context);
}

bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings);
}

void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context)
Expand Down Expand Up @@ -529,7 +529,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(

ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context);
sample_path = iterator->getLastFilePath();
return schema;
}
Expand All @@ -550,7 +550,7 @@ std::string StorageObjectStorage::resolveFormatFromData(

std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAndFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
std::string & sample_path,
const ContextPtr & context)
Expand All @@ -559,13 +559,13 @@ std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAn
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context);
sample_path = iterator->getLastFilePath();
configuration->format = format;
configuration->setFormat(format);
return std::pair(columns, format);
}

void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
{
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false);
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false);
}

SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)
Expand Down
Loading
Loading