diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index 2985be6c1fcb..3184c1308ef1 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -149,9 +149,9 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll if (collection.has("account_key")) account_key = collection.get("account_key"); - structure = collection.getOrDefault("structure", "auto"); - format = collection.getOrDefault("format", format); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + setStructure(collection.getOrDefault("structure", "auto")); + setFormat(collection.getOrDefault("format", getFormat())); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); blobs_paths = {blob_path}; connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); @@ -187,12 +187,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { - format = fourth_arg; + setFormat(fourth_arg); } else { if (with_structure) - structure = fourth_arg; + setStructure(fourth_arg); else throw Exception( ErrorCodes::BAD_ARGUMENTS, @@ -204,8 +204,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { - format = fourth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); + setFormat(fourth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[4], "compression")); } else { @@ -220,9 +220,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, { if (with_structure) { - format = fourth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); - structure = checkAndGetLiteralArgument(engine_args[5], "structure"); + setFormat(fourth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[4], "compression")); + setStructure(checkAndGetLiteralArgument(engine_args[5], "structure")); } else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); @@ -234,12 +234,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/structure"); if (is_format_arg(sixth_arg)) { - format = sixth_arg; + setFormat(sixth_arg); } else { if (with_structure) - structure = sixth_arg; + setStructure(sixth_arg); else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); } @@ -258,8 +258,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); - format = sixth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); + setFormat(sixth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[6], "compression")); } else if (with_structure && engine_args.size() == 8) { @@ -269,9 +269,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); - format = sixth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); - structure = checkAndGetLiteralArgument(engine_args[7], "structure"); + setFormat(sixth_arg); + setCompressionMethod (checkAndGetLiteralArgument(engine_args[6], "compression")); + setStructure(checkAndGetLiteralArgument(engine_args[7], "structure")); } blobs_paths = {blob_path}; diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 305c1c837576..80a8fedd289a 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -373,6 +373,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, return getImpl().createArgsWithAccessData(); } + const String & getFormat() const override { return getImpl().getFormat(); } + const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); } + const String & getStructure() const override { return getImpl().getStructure(); } + + void setFormat(const String & format_) override { getImpl().setFormat(format_); } + void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); } + void setStructure(const String & structure_) override { getImpl().setStructure(structure_); } + const StorageObjectStorageSettings & getSettingsRef() const override { return getImpl().getSettingsRef(); diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index d8deea67cb12..cdfe432b7e62 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -44,7 +44,7 @@ Strings HudiMetadata::getDataFilesImpl() const { auto configuration_ptr = configuration.lock(); auto log = getLogger("HudiMetadata"); - const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format)); + const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->getFormat())); using Partition = std::string; using FileID = std::string; diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 304655dc700c..ec09b2f0e74e 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -111,23 +111,23 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit if (args.size() > 1) { - format = checkAndGetLiteralArgument(args[1], "format_name"); + setFormat(checkAndGetLiteralArgument(args[1], "format_name")); } if (with_structure) { if (args.size() > 2) { - structure = checkAndGetLiteralArgument(args[2], "structure"); + setStructure(checkAndGetLiteralArgument(args[2], "structure")); } if (args.size() > 3) { - compression_method = checkAndGetLiteralArgument(args[3], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[3], "compression_method")); } } else if (args.size() > 2) { - compression_method = checkAndGetLiteralArgument(args[2], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[2], "compression_method")); } setURL(url_str); @@ -143,10 +143,10 @@ void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & colle else url_str = collection.get("url"); - format = collection.getOrDefault("format", "auto"); - compression_method = collection.getOrDefault("compression_method", - collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", "auto")); + setCompressionMethod(collection.getOrDefault("compression_method", + collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); setURL(url_str); } diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp index bae84714273c..c6dfa3318331 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.cpp +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -24,9 +24,9 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr) { path = collection.get("path"); - format = collection.getOrDefault("format", "auto"); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", "auto")); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); paths = {path}; } @@ -46,23 +46,23 @@ void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool wi if (args.size() > 1) { - format = checkAndGetLiteralArgument(args[1], "format_name"); + setFormat(checkAndGetLiteralArgument(args[1], "format_name")); } if (with_structure) { if (args.size() > 2) { - structure = checkAndGetLiteralArgument(args[2], "structure"); + setStructure(checkAndGetLiteralArgument(args[2], "structure")); } if (args.size() > 3) { - compression_method = checkAndGetLiteralArgument(args[3], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[3], "compression_method")); } } else if (args.size() > 2) { - compression_method = checkAndGetLiteralArgument(args[2], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[2], "compression_method")); } paths = {path}; } diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 2baed9ad4176..f71f04696a8d 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -38,8 +38,8 @@ ReadBufferIterator::ReadBufferIterator( , read_keys(read_keys_) , prev_read_keys_size(read_keys_.size()) { - if (configuration->format != "auto") - format = configuration->format; + if (configuration->getFormat() != "auto") + format = configuration->getFormat(); } SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const @@ -148,7 +148,7 @@ std::unique_ptr ReadBufferIterator::recreateLastReadBuffer() auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator")); - const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method); + const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->getCompressionMethod()); const auto zstd_window = static_cast(context->getSettingsRef()[Setting::zstd_window_log_max]); return wrapReadBufferWithCompressionMethod(std::move(impl), compression_method, zstd_window); @@ -266,13 +266,13 @@ ReadBufferIterator::Data ReadBufferIterator::next() using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive; if (const auto * object_info_in_archive = dynamic_cast(current_object_info.get())) { - compression_method = chooseCompressionMethod(filename, configuration->compression_method); + compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod()); const auto & archive_reader = object_info_in_archive->archive_reader; read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); } else { - compression_method = chooseCompressionMethod(filename, configuration->compression_method); + compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod()); read_buf = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, getContext(), getLogger("ReadBufferIterator")); } diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index b6762d4093be..bafc5a8d95db 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -186,9 +186,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect auth_settings[S3AuthSetting::expiration_window_seconds] = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); auth_settings[S3AuthSetting::session_token] = collection.getOrDefault("session_token", ""); - format = collection.getOrDefault("format", format); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", getFormat())); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); request_settings = S3::S3RequestSettings(collection, settings, /* validate_settings */true); @@ -433,14 +433,14 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_ /// Set format to configuration only of it's not 'auto', /// because we can have default format set in configuration. if (format_ != "auto") - format = format_; + setFormat(format_); } if (engine_args_to_idx.contains("structure")) - structure = checkAndGetLiteralArgument(args[engine_args_to_idx["structure"]], "structure"); + setStructure(checkAndGetLiteralArgument(args[engine_args_to_idx["structure"]], "structure")); if (engine_args_to_idx.contains("compression_method")) - compression_method = checkAndGetLiteralArgument(args[engine_args_to_idx["compression_method"]], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[engine_args_to_idx["compression_method"]], "compression_method")); if (engine_args_to_idx.contains("access_key_id")) auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument(args[engine_args_to_idx["access_key_id"]], "access_key_id"); @@ -683,10 +683,10 @@ ASTPtr StorageS3Configuration::createArgsWithAccessData() const arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); if (!auth_settings[S3AuthSetting::session_token].value.empty()) arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); - if (format != "auto") - arguments->children.push_back(std::make_shared(format)); - if (!compression_method.empty()) - arguments->children.push_back(std::make_shared(compression_method)); + if (getFormat() != "auto") + arguments->children.push_back(std::make_shared(getFormat())); + if (!getCompressionMethod().empty()) + arguments->children.push_back(std::make_shared(getCompressionMethod())); if (!auth_settings[S3AuthSetting::role_arn].value.empty()) { diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index e1b1fc3d7c11..97fcd9833f30 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -99,7 +99,7 @@ StorageObjectStorage::StorageObjectStorage( , distributed_processing(distributed_processing_) , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { - bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->format.empty(); + bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty(); update_configuration_on_read = !is_table_function_ || do_lazy_init; bool failed_init = false; auto do_init = [&]() @@ -115,7 +115,7 @@ StorageObjectStorage::StorageObjectStorage( { // If we don't have format or schema yet, we can't ignore failed configuration update, // because relevant configuration is crucial for format and schema inference - if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->format == "auto")) + if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto")) { throw; } @@ -132,7 +132,7 @@ StorageObjectStorage::StorageObjectStorage( std::string sample_path; ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context); + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); configuration->check(context); StorageInMemoryMetadata metadata; @@ -172,17 +172,17 @@ String StorageObjectStorage::getName() const bool StorageObjectStorage::prefersLargeBlocks() const { - return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format); + return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->getFormat()); } bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const { - return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context); + return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context); } bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings); } void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context) @@ -529,7 +529,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); - auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context); + auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context); sample_path = iterator->getLastFilePath(); return schema; } @@ -550,7 +550,7 @@ std::string StorageObjectStorage::resolveFormatFromData( std::pair StorageObjectStorage::resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, - const ConfigurationPtr & configuration, + ConfigurationPtr & configuration, const std::optional & format_settings, std::string & sample_path, const ContextPtr & context) @@ -559,13 +559,13 @@ std::pair StorageObjectStorage::resolveSchemaAn auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context); sample_path = iterator->getLastFilePath(); - configuration->format = format; + configuration->setFormat(format); return std::pair(columns, format); } void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const { - configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false); + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false); } SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 5d68c7669082..dfcbd709af8c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -130,7 +130,7 @@ class StorageObjectStorage : public IStorage static std::pair resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, - const ConfigurationPtr & configuration, + ConfigurationPtr & configuration, const std::optional & format_settings, std::string & sample_path, const ContextPtr & context); @@ -256,10 +256,6 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method iterate() is not implemented for configuration type {}", getTypeName()); } - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; - virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); virtual void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); @@ -279,6 +275,19 @@ class StorageObjectStorage::Configuration virtual void assertInitialized() const; + virtual const String & getFormat() const { return format; } + virtual const String & getCompressionMethod() const { return compression_method; } + virtual const String & getStructure() const { return structure; } + + virtual void setFormat(const String & format_) { format = format_; } + virtual void setCompressionMethod(const String & compression_method_) { compression_method = compression_method_; } + virtual void setStructure(const String & structure_) { structure = structure_; } + +private: + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; + bool initialized = false; std::atomic updated = false; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 37f77dc96772..11f8fe0ec4bb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -85,7 +85,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( { ColumnsDescription columns{columns_}; std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, sample_path, context_); + resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata metadata; @@ -275,7 +275,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->extractDynamicStorageType(args, context, &object_storage_type_arg); if (cluster_name_in_settings) { - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true); /// Convert to old-stype *Cluster table function. /// This allows to use old clickhouse versions in cluster. @@ -330,7 +330,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( { ASTPtr cluster_name_arg = args.front(); args.erase(args.begin()); - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true); args.insert(args.begin(), cluster_name_arg); } if (object_storage_type_arg) @@ -429,7 +429,7 @@ void StorageObjectStorageCluster::truncate( void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const { - configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false); + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false); } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index ff073357c21f..ef216ac5c450 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -121,7 +121,7 @@ class StorageObjectStorageCluster : public IStorageCluster void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context); const String engine_name; - const StorageObjectStorage::ConfigurationPtr configuration; + StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; bool cluster_name_in_settings; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index bd7cee407e62..263188301877 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -32,7 +32,7 @@ StorageObjectStorageSink::StorageObjectStorageSink( { const auto & settings = context->getSettingsRef(); const auto path = blob_path.empty() ? configuration->getPaths().back() : blob_path; - const auto chosen_compression_method = chooseCompressionMethod(path, configuration->compression_method); + const auto chosen_compression_method = chooseCompressionMethod(path, configuration->getCompressionMethod()); auto buffer = object_storage->writeObject( StoredObject(path), WriteMode::Rewrite, std::nullopt, DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); @@ -44,7 +44,7 @@ StorageObjectStorageSink::StorageObjectStorageSink( static_cast(settings[Setting::output_format_compression_zstd_window_log])); writer = FormatFactory::instance().getOutputFormatParallelIfPossible( - configuration->format, *write_buf, sample_block, context, format_settings_); + configuration->getFormat(), *write_buf, sample_block, context, format_settings_); } void StorageObjectStorageSink::consume(Chunk & chunk) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f0c541eddff7..90f32664952e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -342,7 +342,7 @@ Chunk StorageObjectStorageSource::generate() void StorageObjectStorageSource::addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows) { const auto cache_key = getKeyForSchemaCache( - getUniqueStoragePathIdentifier(*configuration, object_info), configuration->format, format_settings, read_context); + getUniqueStoragePathIdentifier(*configuration, object_info), configuration->getFormat(), format_settings, read_context); schema_cache.addNumRows(cache_key, num_rows); } @@ -404,7 +404,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade const auto cache_key = getKeyForSchemaCache( getUniqueStoragePathIdentifier(*configuration, *object_info), - configuration->format, + configuration->getFormat(), format_settings, context_); @@ -435,13 +435,13 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade CompressionMethod compression_method; if (const auto * object_info_in_archive = dynamic_cast(object_info.get())) { - compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method); + compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->getCompressionMethod()); const auto & archive_reader = object_info_in_archive->archive_reader; read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); } else { - compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->compression_method); + compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); read_buf = createReadBuffer(*object_info, object_storage, context_, log); } @@ -459,7 +459,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade auto input_format = FormatFactory::instance().getInput( - configuration->format, + configuration->getFormat(), *read_buf, initial_header, context_, diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 73410d959e0d..d4f152bfd582 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -45,24 +45,27 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( void resolveSchemaAndFormat( ColumnsDescription & columns, - std::string & format, ObjectStoragePtr object_storage, - const StorageObjectStorage::ConfigurationPtr & configuration, + StorageObjectStorage::ConfigurationPtr configuration, std::optional format_settings, std::string & sample_path, const ContextPtr & context) { if (columns.empty()) { - if (format == "auto") + if (configuration->getFormat() == "auto") + { + std::string format; std::tie(columns, format) = StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(format); + } else columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); } - else if (format == "auto") + else if (configuration->getFormat() == "auto") { - format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context)); } if (!columns.hasOnlyOrdinary()) diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 7ee14f509799..17e30babb709 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -15,9 +15,8 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( void resolveSchemaAndFormat( ColumnsDescription & columns, - std::string & format, ObjectStoragePtr object_storage, - const StorageObjectStorage::ConfigurationPtr & configuration, + StorageObjectStorage::ConfigurationPtr configuration, std::optional format_settings, std::string & sample_path, const ContextPtr & context); diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index c09c53218f94..151a6c3c2ee8 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -222,12 +222,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( validateSettings(*queue_settings_, is_attach); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); - FormatFactory::instance().checkFormatName(configuration->format); + FormatFactory::instance().checkFormatName(configuration->getFormat()); configuration->check(context_); ColumnsDescription columns{columns_}; std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context_); + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata storage_metadata; @@ -242,7 +242,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); auto table_metadata = ObjectStorageQueueMetadata::syncWithKeeper( - zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log); + zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->getFormat(), context_, is_attach, log); ObjectStorageType storage_type = engine_name == "S3Queue" ? ObjectStorageType::S3 : ObjectStorageType::Azure; @@ -302,7 +302,7 @@ void StorageObjectStorageQueue::drop() bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context_, format_settings); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context_, format_settings); } class ReadFromObjectStorageQueue : public SourceStepWithFilter diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h index b851aba72614..2955137ec9ff 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h @@ -55,7 +55,7 @@ class StorageObjectStorageQueue : public IStorage, WithContext ContextPtr local_context, AlterLockHolder & table_lock_holder) override; - const auto & getFormatName() const { return configuration->format; } + const auto & getFormatName() const { return configuration->getFormat(); } const fs::path & getZooKeeperPath() const { return zk_path; } diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 2456473ef7f7..97ef4d23faec 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -92,16 +92,16 @@ template ColumnsDescription TableFunctionObjectStorage< Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const { - if (configuration->structure == "auto") + if (configuration->getStructure() == "auto") { context->checkAccess(getSourceAccessType()); ColumnsDescription columns; auto storage = getObjectStorage(context, !is_insert_query); std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, sample_path, context); + resolveSchemaAndFormat(columns, storage, configuration, std::nullopt, sample_path, context); return columns; } - return parseColumnsListFromString(configuration->structure, context); + return parseColumnsListFromString(configuration->getStructure(), context); } template @@ -114,8 +114,8 @@ StoragePtr TableFunctionObjectStorage::executeImpl( { ColumnsDescription columns; chassert(configuration); - if (configuration->structure != "auto") - columns = parseColumnsListFromString(configuration->structure, context); + if (configuration->getStructure() != "auto") + columns = parseColumnsListFromString(configuration->getStructure(), context); else if (!structure_hint.empty()) columns = structure_hint; else if (!cached_columns.empty()) diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 9bed13109c95..d234f5b3079d 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -112,15 +112,16 @@ class TableFunctionObjectStorage : public ITableFunction String getName() const override { return name; } - bool hasStaticStructure() const override { return configuration->structure != "auto"; } + bool hasStaticStructure() const override { return configuration->getStructure() != "auto"; } - bool needStructureHint() const override { return configuration->structure == "auto"; } + bool needStructureHint() const override { return configuration->getStructure() == "auto"; } void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } bool supportsReadingSubsetOfColumns(const ContextPtr & context) override { - return configuration->format != "auto" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context); + return configuration->getFormat() != "auto" + && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context); } std::unordered_set getVirtualsToCheckBeforeUsingStructureHint() const override diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index cd6159bfdf31..c30ce59e330c 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -23,8 +23,8 @@ StoragePtr TableFunctionObjectStorageCluster::execute ColumnsDescription columns; - if (configuration->structure != "auto") - columns = parseColumnsListFromString(configuration->structure, context); + if (configuration->getStructure() != "auto") + columns = parseColumnsListFromString(configuration->getStructure(), context); else if (!Base::structure_hint.empty()) columns = Base::structure_hint; else if (!cached_columns.empty()) diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index a363e9a81c2d..d03f7198d359 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -95,9 +95,9 @@ class TableFunctionObjectStorageCluster : public ITableFunctionClusterstructure != "auto"; } + bool hasStaticStructure() const override { return Base::getConfiguration()->getStructure() != "auto"; } - bool needStructureHint() const override { return Base::getConfiguration()->structure == "auto"; } + bool needStructureHint() const override { return Base::getConfiguration()->getStructure() == "auto"; } void setStructureHint(const ColumnsDescription & structure_hint_) override { Base::structure_hint = structure_hint_; } };