From 0693e7160e34932d4c3de4ccb6f0492460d8bbd5 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 9 Sep 2025 21:48:58 +0200 Subject: [PATCH 1/7] Merge pull request #952 from Altinity/feature/antalya-25.6.5/rendezvous_hashing 25.6.5 Antalya port of #709, #760, #866 - Rendezvous hashing --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 7 + src/Disks/ObjectStorages/IObjectStorage.cpp | 36 ++++ src/Disks/ObjectStorages/IObjectStorage.h | 33 ++- .../StorageObjectStorageCluster.cpp | 17 +- .../StorageObjectStorageSource.cpp | 25 ++- ...rageObjectStorageStableTaskDistributor.cpp | 61 +++++- ...torageObjectStorageStableTaskDistributor.h | 15 +- .../tests/gtest_rendezvous_hashing.cpp | 18 +- .../test_s3_cache_locality/__init__.py | 0 .../configs/cluster.xml | 126 +++++++++++ .../configs/named_collections.xml | 10 + .../test_s3_cache_locality/configs/users.xml | 9 + .../test_s3_cache_locality/test.py | 196 ++++++++++++++++++ 14 files changed, 529 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_s3_cache_locality/__init__.py create mode 100644 tests/integration/test_s3_cache_locality/configs/cluster.xml create mode 100644 tests/integration/test_s3_cache_locality/configs/named_collections.xml create mode 100644 tests/integration/test_s3_cache_locality/configs/users.xml create mode 100644 tests/integration/test_s3_cache_locality/test.py diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 497760a1325d..78799bf884f8 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7001,6 +7001,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( +In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. +)", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 1ba731c37bbf..b39ab27b41a2 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -132,6 +132,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"}, {"allow_experimental_insert_into_iceberg", false, false, "New setting."}, /// RELEASE CLOSED + {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, + {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, + {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, + {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index acbd3e1fa2c0..4b8999cafa44 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + namespace DB { @@ -104,4 +108,36 @@ std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const return getPath(); } +RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 1f3a4278f135..0712ca0452da 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -111,18 +111,41 @@ struct DataLakeObjectMetadata; struct RelativePathWithMetadata { + class CommandInTaskResponse + { + public: + CommandInTaskResponse() = default; + explicit CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + String relative_path; /// Object metadata: size, modification time, etc. std::optional metadata; /// Delta lake related object metadata. std::optional data_lake_metadata; + /// Retry request after short pause + CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(String relative_path_, std::optional metadata_ = std::nullopt) - : relative_path(std::move(relative_path_)) - , metadata(std::move(metadata_)) - {} + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) + : metadata(std::move(metadata_)) + , command(task_string) + { + if (!command.is_parsed()) + relative_path = task_string; + } RelativePathWithMetadata(const RelativePathWithMetadata & other) = default; @@ -134,6 +157,8 @@ struct RelativePathWithMetadata virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual std::string getPathOrPathToArchiveIfArchive() const; + + const CommandInTaskResponse & getCommand() const { return command; } }; struct ObjectKeyWithMetadata diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index ea529021429e..dde4935372d5 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -24,11 +24,13 @@ namespace Setting { extern const SettingsBool use_hive_partitioning; extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; + extern const SettingsUInt64 lock_object_storage_task_distribution_ms; } namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int INVALID_SETTING_VALUE; } String StorageObjectStorageCluster::getPathSample(ContextPtr context) @@ -224,10 +226,23 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten } } + uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]; + + /// Check value to avoid negative result after conversion in microseconds. + /// Poco::Timestamp::TimeDiff is signed int 64. + static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL; + if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}", + lock_object_storage_task_distribution_ms, + lock_object_storage_task_distribution_ms_max + ); + auto task_distributor = std::make_shared( iterator, std::move(ids_of_hosts), - /* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]); + /* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes], + lock_object_storage_task_distribution_ms); auto callback = std::make_shared( [task_distributor, local_context](size_t number_of_current_replica) mutable -> ClusterFunctionReadTaskResponsePtr diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index db47a7fc7945..7823fd0a1022 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -445,11 +446,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + bool not_a_path = false; + do { + not_a_path = false; object_info = file_iterator->next(processor); - if (!object_info || object_info->getPath().empty()) + if (!object_info) + return {}; + + if (object_info->getCommand().is_parsed()) + { + auto retry_after_us = object_info->getCommand().get_retry_after_us(); + if (retry_after_us.has_value()) + { + not_a_path = true; + /// TODO: Make asyncronous waiting without sleep in thread + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); + continue; + } + } + + if (object_info->getPath().empty()) return {}; if (!object_info->metadata) @@ -468,7 +489,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->metadata = object_storage->getObjectMetadata(path); } } - while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0); + while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); QueryPipelineBuilder builder; std::shared_ptr source; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 37b966cd7bbc..01322ac16fa2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -14,11 +14,13 @@ namespace ErrorCodes StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, std::vector && ids_of_nodes_, - bool send_over_whole_archive_) + bool send_over_whole_archive_, + uint64_t lock_object_storage_task_distribution_ms_) : iterator(std::move(iterator_)) , send_over_whole_archive(send_over_whole_archive_) , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(std::move(ids_of_nodes_)) + , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { } @@ -27,6 +29,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb { LOG_TRACE(log, "Received request from replica {} looking for a file", number_of_current_replica); + saveLastNodeActivity(number_of_current_replica); + // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -159,7 +163,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.emplace(file_path, object_info); + unprocessed_files.emplace(file_path, std::make_pair(object_info, number_of_current_replica)); connection_to_files[file_replica_idx].push_back(object_info); } } @@ -169,26 +173,65 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) { + /// Limit time of node activity to keep task in queue + Poco::Timestamp activity_limit; + Poco::Timestamp oldest_activity; + if (lock_object_storage_task_distribution_us > 0) + activity_limit -= lock_object_storage_task_distribution_us; + std::lock_guard lock(mutex); if (!unprocessed_files.empty()) { auto it = unprocessed_files.begin(); - auto next_file = it->second; - unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); + while (it != unprocessed_files.end()) + { + auto last_activity = last_node_activity.find(it->second.second); + if (lock_object_storage_task_distribution_us <= 0 + || last_activity == last_node_activity.end() + || activity_limit > last_activity->second) + { + auto next_file = it->second.first; + unprocessed_files.erase(it); + + auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + file_path, + number_of_current_replica + ); + + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } + LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", - file_path, - number_of_current_replica + "No unprocessed file for replica {}, need to retry after {} us", + number_of_current_replica, + oldest_activity - activity_limit ); - return next_file; + /// All unprocessed files owned by alive replicas with recenlty activity + /// Need to retry after (oldest_activity - activity_limit) microseconds + RelativePathWithMetadata::CommandInTaskResponse response; + response.set_retry_after_us(oldest_activity - activity_limit); + return std::make_shared(response.to_string()); } return {}; } +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 02d3ba7a030f..a65adfad8cbe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -4,7 +4,12 @@ #include #include #include +#include + +#include + #include +#include #include #include #include @@ -18,7 +23,8 @@ class StorageObjectStorageStableTaskDistributor StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, std::vector && ids_of_nodes_, - bool send_over_whole_archive_); + bool send_over_whole_archive_, + uint64_t lock_object_storage_task_distribution_ms_); ObjectInfoPtr getNextTask(size_t number_of_current_replica); @@ -28,14 +34,19 @@ class StorageObjectStorageStableTaskDistributor ObjectInfoPtr getMatchingFileFromIterator(size_t number_of_current_replica); ObjectInfoPtr getAnyUnprocessedFile(size_t number_of_current_replica); + void saveLastNodeActivity(size_t number_of_current_replica); + const std::shared_ptr iterator; const bool send_over_whole_archive; std::vector> connection_to_files; - std::unordered_map unprocessed_files; + std::unordered_map> unprocessed_files; std::vector ids_of_nodes; + std::unordered_map last_node_activity; + Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; + std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp b/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp index b00c1d609fa1..47a45d925ebf 100644 --- a/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp +++ b/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp @@ -101,7 +101,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 0, 10)); ASSERT_TRUE(checkHead(paths, {6})); @@ -110,7 +110,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 1, 10)); ASSERT_TRUE(checkHead(paths, {0, 2, 4})); @@ -119,7 +119,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 2, 10)); ASSERT_TRUE(checkHead(paths, {1, 5, 7, 8})); @@ -128,7 +128,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 3, 10)); ASSERT_TRUE(checkHead(paths, {3, 9})); @@ -139,7 +139,7 @@ TEST(RendezvousHashing, MultipleNodes) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); { std::vector paths; @@ -171,7 +171,7 @@ TEST(RendezvousHashing, SingleNodeReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 0, 10)); ASSERT_TRUE(checkHead(paths, {1, 5, 6, 7, 8, 9})); @@ -180,7 +180,7 @@ TEST(RendezvousHashing, SingleNodeReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 1, 10)); ASSERT_TRUE(checkHead(paths, {0, 2, 3, 4})); @@ -191,7 +191,7 @@ TEST(RendezvousHashing, MultipleNodesReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); { std::vector paths; @@ -210,7 +210,7 @@ TEST(RendezvousHashing, MultipleNodesReducedClusterOneByOne) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); std::vector paths0; std::vector paths1; diff --git a/tests/integration/test_s3_cache_locality/__init__.py b/tests/integration/test_s3_cache_locality/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_s3_cache_locality/configs/cluster.xml b/tests/integration/test_s3_cache_locality/configs/cluster.xml new file mode 100644 index 000000000000..db54c35374b9 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/cluster.xml @@ -0,0 +1,126 @@ + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + + + + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + + + + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + + + + + + + /var/lib/clickhouse/raw_s3_cache + 10Gi + + + diff --git a/tests/integration/test_s3_cache_locality/configs/named_collections.xml b/tests/integration/test_s3_cache_locality/configs/named_collections.xml new file mode 100644 index 000000000000..6994aa3f5e77 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/named_collections.xml @@ -0,0 +1,10 @@ + + + + http://minio1:9001/root/data/* + minio + ClickHouse_Minio_P@ssw0rd + CSV> + + + diff --git a/tests/integration/test_s3_cache_locality/configs/users.xml b/tests/integration/test_s3_cache_locality/configs/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py new file mode 100644 index 000000000000..da85e78a5643 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/test.py @@ -0,0 +1,196 @@ +import csv +import logging +import os +import shutil +import uuid + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.config_cluster import minio_secret_key + + +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +def create_buckets_s3(cluster, files=1000): + minio = cluster.minio_client + + s3_data = [] + + for file_number in range(files): + file_name = f"data/generated/file_{file_number}.csv" + os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) + s3_data.append(file_name) + with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f: + # a String, b UInt64 + data = [] + + # Make all files a bit different + data.append( + ["str_" + str(file_number), file_number] + ) + + writer = csv.writer(f) + writer.writerows(data) + + for file in s3_data: + minio.fput_object( + bucket_name=cluster.minio_bucket, + object_name=file, + file_path=os.path.join(SCRIPT_DIR, file), + ) + + for obj in minio.list_objects(cluster.minio_bucket, recursive=True): + print(obj.object_name) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + # clickhouse0 not a member of cluster_XXX + for i in range(6): + cluster.add_instance( + f"clickhouse{i}", + main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + user_configs=["configs/users.xml"], + macros={"replica": f"clickhouse{i}"}, + with_minio=True, + with_zookeeper=True, + stay_alive=True, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + create_buckets_s3(cluster) + + yield cluster + finally: + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) + cluster.shutdown() + + +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): + for host in list(cluster.instances.values()): + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) + + settings = { + "enable_filesystem_cache": enable_filesystem_cache, + "filesystem_cache_name": "'raw_s3_cache'", + } + + if lock_object_storage_task_distribution_ms > 0: + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + + query_id_first = str(uuid.uuid4()) + result_first = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} + """, + query_id=query_id_first, + ) + assert result_first == expected_result + query_id_second = str(uuid.uuid4()) + result_second = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} + """, + query_id=query_id_second, + ) + assert result_second == expected_result + + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") + + s3_get_first = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_first}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_first}' + """, + ) + s3_get_second = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_second}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_second}' + """, + ) + + return int(s3_get_first), int(s3_get_second) + + +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): + # Repeat test several times to get average result + iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10 + s3_get_first_sum = 0 + s3_get_second_sum = 0 + for _ in range(iterations): + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms) + s3_get_first_sum += s3_get_first + s3_get_second_sum += s3_get_second + return s3_get_first_sum, s3_get_second_sum + + +@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000]) +def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms): + node = started_cluster.instances["clickhouse0"] + + expected_result = node.query( + f""" + SELECT count(*) + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') + WHERE b=42 + """ + ) + + # Algorithm does not give 100% guarantee, so add 10% on dispersion + dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1 + + # No cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms) + assert s3_get_second == s3_get_first + + # With cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * dispersion + + # Different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * dispersion + + # No last node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 + + # No first node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 + + # No first node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) + + # Add new node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) + + # New node and old node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) + assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000 From 3be47a7d91bdb26a15fa6428b6ef089af97ac79e Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 11 Sep 2025 14:53:53 +0200 Subject: [PATCH 2/7] Merge pull request #780 from Altinity/feature/retries_in_cluster_functions Restart cluster tasks on connection lost --- src/Core/Protocol.h | 2 + src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 2 + src/QueryPipeline/RemoteQueryExecutor.cpp | 28 +++++++- src/QueryPipeline/RemoteQueryExecutor.h | 22 +++++- .../RemoteQueryExecutorReadContext.cpp | 33 ++++++--- .../RemoteQueryExecutorReadContext.h | 6 +- .../StorageObjectStorageCluster.cpp | 45 ++++++++---- ...rageObjectStorageStableTaskDistributor.cpp | 72 ++++++++++++++++--- ...torageObjectStorageStableTaskDistributor.h | 5 ++ src/Storages/StorageFileCluster.cpp | 49 ++++++++++--- src/Storages/StorageURLCluster.cpp | 44 +++++++++--- 12 files changed, 254 insertions(+), 57 deletions(-) diff --git a/src/Core/Protocol.h b/src/Core/Protocol.h index 1db06e0f916d..9db589f4ca34 100644 --- a/src/Core/Protocol.h +++ b/src/Core/Protocol.h @@ -96,8 +96,10 @@ namespace Protocol MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone SSHChallenge = 18, /// Return challenge for SSH signature signing + MAX = SSHChallenge, + ConnectionLost = 255, /// Exception that occurred on the client side. }; /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 78799bf884f8..2083d8784881 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7027,6 +7027,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"( )", EXPERIMENTAL) \ DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"( Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( +Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index b39ab27b41a2..7b2d7358a618 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -260,6 +260,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_replicas_for_cluster_engines", false, true, "New setting."}, {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 + {"use_object_storage_list_objects_cache", true, false, "New setting."}, + {"allow_retries_in_cluster_requests", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.2", { diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 9984ca24117f..18b33fc81d6d 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -52,6 +52,7 @@ namespace Setting extern const SettingsBool use_hedged_requests; extern const SettingsBool push_external_roles_in_interserver_queries; extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms; + extern const SettingsBool allow_retries_in_cluster_requests; } namespace ErrorCodes @@ -82,6 +83,7 @@ RemoteQueryExecutor::RemoteQueryExecutor( , extension(extension_) , priority_func(priority_func_) , read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests]) + , allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests]) { if (stage == QueryProcessingStage::QueryPlan && !query_plan) throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage"); @@ -457,7 +459,8 @@ int RemoteQueryExecutor::sendQueryAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ true, - read_packet_type_separately); + read_packet_type_separately, + allow_retries_in_cluster_requests); /// If query already sent, do nothing. Note that we cannot use sent_query flag here, /// because we can still be in process of sending scalars or external tables. @@ -530,7 +533,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() read_context = std::make_unique( *this, /*suspend_when_query_sent*/ false, - read_packet_type_separately); + read_packet_type_separately, + allow_retries_in_cluster_requests); recreate_read_context = false; } @@ -654,7 +658,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet /// We can actually return it, and the first call to RemoteQueryExecutor::read /// will return earlier. We should consider doing it. if (!packet.block.empty() && (packet.block.rows() > 0)) + { + if (extension && extension->replica_info) + replica_has_processed_data.insert(extension->replica_info->number_of_current_replica); return ReadResult(adaptBlockStructure(packet.block, *header)); + } break; /// If the block is empty - we will receive other packets before EndOfStream. case Protocol::Server::Exception: @@ -716,6 +724,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet case Protocol::Server::TimezoneUpdate: break; + case Protocol::Server::ConnectionLost: + if (allow_retries_in_cluster_requests) + { + if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info) + { + if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica)) + { + finished = true; + extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica); + return ReadResult(Block{}); + } + } + } + packet.exception->rethrow(); + break; + default: got_unknown_packet_from_replica = true; throw Exception( diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index e3fb64bb0f7b..e73dc2fc3308 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -31,8 +31,22 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; -/// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +}; + +class TaskIterator +{ +public: + virtual ~TaskIterator() = default; + virtual bool supportRerunTask() const { return false; } + virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented"); + } + virtual ClusterFunctionReadTaskResponsePtr operator()(size_t number_of_current_replica) const = 0; +}; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor @@ -316,6 +330,10 @@ class RemoteQueryExecutor const bool read_packet_type_separately = false; + const bool allow_retries_in_cluster_requests = false; + + std::unordered_set replica_has_processed_data; + /// Send all scalars to remote servers void sendScalars(); diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 9090d045daae..0e3fb4952eb4 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -19,11 +19,15 @@ namespace ErrorCodes } RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( - RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_) + RemoteQueryExecutor & executor_, + bool suspend_when_query_sent_, + bool read_packet_type_separately_, + bool allow_retries_in_cluster_requests_) : AsyncTaskExecutor(std::make_unique(*this)) , executor(executor_) , suspend_when_query_sent(suspend_when_query_sent_) , read_packet_type_separately(read_packet_type_separately_) + , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_) { if (-1 == pipe2(pipe_fd, O_NONBLOCK)) throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe"); @@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - while (true) + try { - read_context.has_read_packet_part = PacketPart::None; - - if (read_context.read_packet_type_separately) + while (true) { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); + } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; suspend_callback(); } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; + } + catch (const Exception &) + { + if (!read_context.allow_retries_in_cluster_requests) + throw; + read_context.packet.type = Protocol::Server::ConnectionLost; + read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index abde6cb93ef3..d850244bed6d 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor { public: explicit RemoteQueryExecutorReadContext( - RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_); + RemoteQueryExecutor & executor_, + bool suspend_when_query_sent_, + bool read_packet_type_separately_, + bool allow_retries_in_cluster_requests_); ~RemoteQueryExecutorReadContext() override; @@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor bool suspend_when_query_sent = false; bool is_query_sent = false; const bool read_packet_type_separately = false; + const bool allow_retries_in_cluster_requests = false; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index dde4935372d5..afbed7d97aff 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -191,6 +191,36 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } } +class TaskDistributor : public TaskIterator +{ +public: + TaskDistributor(std::shared_ptr iterator, + std::vector && ids_of_hosts, + bool send_over_whole_archive, + uint64_t lock_object_storage_task_distribution_ms, + ContextPtr context_ + ) + : task_distributor(iterator, std::move(ids_of_hosts), send_over_whole_archive, lock_object_storage_task_distribution_ms) + , context(context_) {} + ~TaskDistributor() override = default; + bool supportRerunTask() const override { return true; } + void rescheduleTasksFromReplica(size_t number_of_current_replica) override + { + task_distributor.rescheduleTasksFromReplica(number_of_current_replica); + } + + ClusterFunctionReadTaskResponsePtr operator()(size_t number_of_current_replica) const override + { + auto task = task_distributor.getNextTask(number_of_current_replica); + if (task) + return std::make_shared(std::move(task), context); + return std::make_shared(); + } + +private: + mutable StorageObjectStorageStableTaskDistributor task_distributor; + ContextPtr context; +}; RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, @@ -238,20 +268,11 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten lock_object_storage_task_distribution_ms_max ); - auto task_distributor = std::make_shared( - iterator, + auto callback = std::make_shared(iterator, std::move(ids_of_hosts), /* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes], - lock_object_storage_task_distribution_ms); - - auto callback = std::make_shared( - [task_distributor, local_context](size_t number_of_current_replica) mutable -> ClusterFunctionReadTaskResponsePtr - { - auto task = task_distributor->getNextTask(number_of_current_replica); - if (task) - return std::make_shared(std::move(task), local_context); - return std::make_shared(); - }); + lock_object_storage_task_distribution_ms, + local_context); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 01322ac16fa2..e18618d6fc37 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -9,7 +9,8 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; -} + extern const int CANNOT_READ_ALL_DATA; +}; StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, @@ -23,6 +24,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { + size_t nodes = ids_of_nodes.size(); + for (size_t i = 0; i < nodes; ++i) + replica_to_files_to_be_processed[i] = std::list{}; } ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) @@ -31,16 +35,27 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - // 1. Check pre-queued files first - if (auto file = getPreQueuedFile(number_of_current_replica)) - return file; + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + // 1. Check pre-queued files first + auto file = getPreQueuedFile(number_of_current_replica); // 2. Try to find a matching file from the iterator - if (auto file = getMatchingFileFromIterator(number_of_current_replica)) - return file; - + if (!file) + file = getMatchingFileFromIterator(number_of_current_replica); // 3. Process unprocessed files if iterator is exhausted - return getAnyUnprocessedFile(number_of_current_replica); + if (!file) + file = getAnyUnprocessedFile(number_of_current_replica); + + if (file) + processed_file_list_ptr->second.push_back(file); + + return file; } size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) @@ -52,16 +67,27 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String return 0; /// Rendezvous hashing - size_t best_id = 0; - UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); - for (size_t id = 1; id < nodes_count; ++id) + auto replica = replica_to_files_to_be_processed.begin(); + if (replica == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "No active replicas, can't find best replica for file {}", + file_path + ); + + size_t best_id = replica->first; + UInt64 best_weight = sipHash64(ids_of_nodes[best_id] + file_path); + ++replica; + while (replica != replica_to_files_to_be_processed.end()) { + size_t id = replica->first; UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); if (weight > best_weight) { best_weight = weight; best_id = id; } + ++replica; } return best_id; } @@ -234,4 +260,28 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb last_node_activity[number_of_current_replica] = now; } +void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica) +{ + LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica); + std::lock_guard lock(mutex); + + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost already", + number_of_current_replica + ); + + if (replica_to_files_to_be_processed.size() < 2) + throw Exception( + ErrorCodes::CANNOT_READ_ALL_DATA, + "All replicas were marked as lost" + ); + + replica_to_files_to_be_processed.erase(number_of_current_replica); + for (const auto & file : processed_file_list_ptr->second) + unprocessed_files.emplace(file->getPath(), std::make_pair(file, getReplicaForFile(file->getPath()))); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index a65adfad8cbe..25673b3eeb02 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -28,6 +29,9 @@ class StorageObjectStorageStableTaskDistributor ObjectInfoPtr getNextTask(size_t number_of_current_replica); + /// Insert objects back to unprocessed files + void rescheduleTasksFromReplica(size_t number_of_current_replica); + private: size_t getReplicaForFile(const String & file_path); ObjectInfoPtr getPreQueuedFile(size_t number_of_current_replica); @@ -46,6 +50,7 @@ class StorageObjectStorageStableTaskDistributor std::unordered_map last_node_activity; Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; + std::unordered_map> replica_to_files_to_be_processed; std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 0dde974ce9e8..34d5474869af 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -95,21 +95,52 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } +class FileTaskIterator : public TaskIterator +{ +public: + FileTaskIterator(const Strings & files, + std::optional archive_info, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + const NamesAndTypesList & hive_partition_columns_to_read_from_file_path, + const ContextPtr & context, + bool distributed_processing = false) + : iterator(files + , archive_info + , predicate + , virtual_columns + , hive_partition_columns_to_read_from_file_path + , context + , distributed_processing) {} + + ~FileTaskIterator() override = default; + + ClusterFunctionReadTaskResponsePtr operator()(size_t /* number_of_current_replica */) const override + { + auto file = iterator.next(); + if (file.empty()) + return std::make_shared(); + return std::make_shared(std::move(file)); + } + +private: + mutable StorageFileSource::FilesIterator iterator; +}; + RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ActionsDAG * /* filter */, const ContextPtr & context, ClusterPtr) const { - auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); - auto next_callback = [iter = std::move(iterator)](size_t) mutable -> ClusterFunctionReadTaskResponsePtr - { - auto file = iter->next(); - if (file.empty()) - return std::make_shared(); - return std::make_shared(std::move(file)); - }; - auto callback = std::make_shared(std::move(next_callback)); + auto callback = std::make_shared( + paths, + std::nullopt, + predicate, + getVirtualsList(), + hive_partition_columns_to_read_from_file_path, + context + ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 72bda88c7431..16f171201f61 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -127,23 +127,45 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } +class UrlTaskIterator : public TaskIterator +{ +public: + UrlTaskIterator(const String & uri, + size_t max_addresses, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + const NamesAndTypesList & hive_partition_columns_to_read_from_file_path, + const ContextPtr & context) + : iterator(uri, max_addresses, predicate, virtual_columns, hive_partition_columns_to_read_from_file_path, context) {} + + ~UrlTaskIterator() override = default; + + ClusterFunctionReadTaskResponsePtr operator()(size_t /* number_of_current_replica */) const override + { + auto url = iterator.next(); + if (url.empty()) + return std::make_shared(); + return std::make_shared(std::move(url)); + } + +private: + mutable StorageURLSource::DisclosedGlobIterator iterator; +}; + RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ActionsDAG * /* filter */, const ContextPtr & context, ClusterPtr) const { - auto iterator = std::make_shared( - uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context); - - auto next_callback = [iter = std::move(iterator)](size_t) mutable -> ClusterFunctionReadTaskResponsePtr - { - auto url = iter->next(); - if (url.empty()) - return std::make_shared(); - return std::make_shared(std::move(url)); - }; - auto callback = std::make_shared(std::move(next_callback)); + auto callback = std::make_shared( + uri, + context->getSettingsRef()[Setting::glob_expansion_max_elements], + predicate, + getVirtualsList(), + hive_partition_columns_to_read_from_file_path, + context + ); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } From 1f821d4efbb28b66a79531dd3544e59cb55c6efe Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 2 Oct 2025 10:33:40 +0200 Subject: [PATCH 3/7] Merge pull request #1042 from Altinity/bugfix/antalya-25.6.5/lock_object_storage_task_distribution_ms_lost_host Antalya 25.6: Fix lock_object_storage_task_distribution_ms, Changed lock_object_storage_task_distribution_ms value to 500 --- src/Core/Settings.cpp | 14 ++++++++++++-- src/Core/SettingsChangesHistory.cpp | 4 ++++ .../StorageObjectStorageStableTaskDistributor.cpp | 8 ++++++-- tests/integration/test_s3_cache_locality/test.py | 3 +-- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 2083d8784881..d8cc947299ca 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7001,8 +7001,18 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ - DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( -In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"( +In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active. +Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes. + +Possible values: + +- 0 - steal tasks immediately after freeing up. +- >0 - wait for specified period of time before stealing tasks. + +Having this `>0` helps with cache reuse and might improve overall query time. +Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3. +Which might take longer than just waiting for the busy node and generate extra traffic. )", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 7b2d7358a618..ebfa0ac97a8e 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -39,6 +39,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.8.1.20364", + { + {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, + }); addSettingsChanges(settings_changes_history, "25.8", { {"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"}, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index e18618d6fc37..a617d0292820 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -189,7 +189,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.emplace(file_path, std::make_pair(object_info, number_of_current_replica)); + unprocessed_files.emplace(file_path, std::make_pair(object_info, file_replica_idx)); connection_to_files[file_replica_idx].push_back(object_info); } } @@ -281,7 +281,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ replica_to_files_to_be_processed.erase(number_of_current_replica); for (const auto & file : processed_file_list_ptr->second) - unprocessed_files.emplace(file->getPath(), std::make_pair(file, getReplicaForFile(file->getPath()))); + { + auto file_replica_idx = getReplicaForFile(file->getPath()); + unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } } } diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index da85e78a5643..7d2fc2a2ada4 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, "filesystem_cache_name": "'raw_s3_cache'", } - if lock_object_storage_task_distribution_ms > 0: - settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query( From e1373eed403f57b5036b1f93ca235f1363c94c5c Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 12 Sep 2025 13:06:40 +0200 Subject: [PATCH 4/7] Merge pull request #1014 from Altinity/feature/system_preshutdown_v2 SYSTEM STOP SWARM MODE command for graceful shutdown swarm node merge attempt v2 --- docs/en/sql-reference/statements/system.md | 6 ++ programs/server/Server.cpp | 4 + src/Access/Common/AccessType.h | 1 + src/Common/CurrentMetrics.cpp | 1 + src/Interpreters/ClusterDiscovery.cpp | 62 +++++++++++++- src/Interpreters/ClusterDiscovery.h | 13 +++ src/Interpreters/Context.cpp | 45 +++++++++- src/Interpreters/Context.h | 11 +++ src/Interpreters/InterpreterSystemQuery.cpp | 20 +++++ src/Parsers/ASTSystemQuery.cpp | 2 + src/Parsers/ASTSystemQuery.h | 2 + src/QueryPipeline/RemoteQueryExecutor.cpp | 5 ++ src/QueryPipeline/RemoteQueryExecutor.h | 2 + .../RemoteQueryExecutorReadContext.cpp | 36 ++++++-- .../RemoteQueryExecutorReadContext.h | 1 + .../StorageObjectStorageSource.cpp | 12 +++ .../test_s3_cluster/data/graceful/part0.csv | 1 + .../test_s3_cluster/data/graceful/part1.csv | 1 + .../test_s3_cluster/data/graceful/part2.csv | 1 + .../test_s3_cluster/data/graceful/part3.csv | 1 + .../test_s3_cluster/data/graceful/part4.csv | 1 + .../test_s3_cluster/data/graceful/part5.csv | 1 + .../test_s3_cluster/data/graceful/part6.csv | 1 + .../test_s3_cluster/data/graceful/part7.csv | 1 + .../test_s3_cluster/data/graceful/part8.csv | 1 + .../test_s3_cluster/data/graceful/part9.csv | 1 + .../test_s3_cluster/data/graceful/partA.csv | 1 + .../test_s3_cluster/data/graceful/partB.csv | 1 + .../test_s3_cluster/data/graceful/partC.csv | 1 + .../test_s3_cluster/data/graceful/partD.csv | 1 + .../test_s3_cluster/data/graceful/partE.csv | 1 + .../test_s3_cluster/data/graceful/partF.csv | 1 + tests/integration/test_s3_cluster/test.py | 82 +++++++++++++++++++ .../01271_show_privileges.reference | 1 + 34 files changed, 311 insertions(+), 11 deletions(-) create mode 100644 tests/integration/test_s3_cluster/data/graceful/part0.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part1.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part2.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part3.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part4.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part5.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part6.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part7.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part8.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part9.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partA.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partB.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partC.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partD.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partE.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partF.csv diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index 3168254596c5..4524a84a98d2 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name] Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`) +## PRESHUTDOWN {#preshutdown} + + + +Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.). + ## KILL {#kill} Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c02cf2e89e8b..0dad367c1933 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -2329,6 +2329,8 @@ try } + global_context->startSwarmMode(); + { std::lock_guard lock(servers_lock); /// We should start interserver communications before (and more important shutdown after) tables. @@ -2777,6 +2779,8 @@ try is_cancelled = true; + global_context->stopSwarmMode(); + LOG_DEBUG(log, "Waiting for current connections to close."); size_t current_connections = 0; diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 13a9911c702e..7fb8cdb95243 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -334,6 +334,7 @@ enum class AccessType : uint8_t M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \ M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \ M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \ + M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \ M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \ M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \ M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \ diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 41b16996ba74..3c206ff6d017 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -431,6 +431,7 @@ M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \ \ M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \ + M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \ \ M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \ M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \ diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 033f1379543c..6c328a3f7beb 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -108,6 +108,13 @@ class ClusterDiscovery::Flags cv.notify_one(); } + void wakeup() + { + std::unique_lock lk(mu); + any_need_update = true; + cv.notify_one(); + } + private: std::condition_variable cv; std::mutex mu; @@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) return true; }; - if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name)) + if (!cluster_info.current_node_is_observer + && context->isSwarmModeEnabled() + && !contains(node_uuids, current_node_name)) { LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name); registerInZk(zk, cluster_info); @@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf return; } + if (!context->isSwarmModeEnabled()) + { + LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name); + return; + } + LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name); zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral); LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name); } +void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info) +{ + if (info.current_node_is_observer) + return; + + String node_path = getShardsListPath(info.zk_root) / current_node_name; + LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name); + + zk->remove(node_path); + LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name); +} + void ClusterDiscovery::initialUpdate() { LOG_DEBUG(log, "Initializing"); @@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate() is_initialized = true; } +void ClusterDiscovery::registerAll() +{ + register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL; + clusters_to_update->wakeup(); +} + +void ClusterDiscovery::unregisterAll() +{ + register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL; + clusters_to_update->wakeup(); +} + void ClusterDiscovery::findDynamicClusters( std::unordered_map & info, std::unordered_set * unchanged_roots) @@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) { up_to_date_callback(); } + + RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE); + + if (flag == RegisterChangeFlag::RCF_REGISTER_ALL) + { + LOG_DEBUG(log, "Register in all dynamic clusters"); + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + registerInZk(zk, info); + } + } + else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL) + { + LOG_DEBUG(log, "Unregister in all dynamic clusters"); + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + unregisterFromZk(zk, info); + } + } } LOG_DEBUG(log, "Worker thread stopped"); return finished; diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index c0e4af3b86f3..2d3bbe489f4e 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -38,6 +38,9 @@ class ClusterDiscovery ~ClusterDiscovery(); + void registerAll(); + void unregisterAll(); + private: struct NodeInfo { @@ -125,6 +128,7 @@ class ClusterDiscovery void initialUpdate(); void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info); + void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info); Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, @@ -207,6 +211,15 @@ class ClusterDiscovery std::shared_ptr>> multicluster_discovery_paths; MultiVersion::Version macros; + + enum RegisterChangeFlag + { + RCF_NONE, + RCF_REGISTER_ALL, + RCF_UNREGISTER_ALL, + }; + + std::atomic register_change_flag = RegisterChangeFlag::RCF_NONE; }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 91a63763a7c4..d32d29ba6e0e 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -224,6 +224,7 @@ namespace CurrentMetrics extern const Metric UncompressedCacheCells; extern const Metric IndexUncompressedCacheBytes; extern const Metric IndexUncompressedCacheCells; + extern const Metric IsSwarmModeEnabled; } @@ -620,6 +621,7 @@ struct ContextSharedPart : boost::noncopyable std::map server_ports; std::atomic shutdown_called = false; + std::atomic swarm_mode_enabled = true; Stopwatch uptime_watch TSA_GUARDED_BY(mutex); @@ -788,6 +790,8 @@ struct ContextSharedPart : boost::noncopyable */ void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { + swarm_mode_enabled = false; + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); bool is_shutdown_called = shutdown_called.exchange(true); if (is_shutdown_called) return; @@ -4825,7 +4829,6 @@ std::shared_ptr Context::getCluster(const std::string & cluster_name) c throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name); } - std::shared_ptr Context::tryGetCluster(const std::string & cluster_name) const { std::shared_ptr res = nullptr; @@ -4844,6 +4847,21 @@ std::shared_ptr Context::tryGetCluster(const std::string & cluster_name return res; } +void Context::unregisterInAutodiscoveryClusters() +{ + std::lock_guard lock(shared->clusters_mutex); + if (!shared->cluster_discovery) + return; + shared->cluster_discovery->unregisterAll(); +} + +void Context::registerInAutodiscoveryClusters() +{ + std::lock_guard lock(shared->clusters_mutex); + if (!shared->cluster_discovery) + return; + shared->cluster_discovery->registerAll(); +} void Context::reloadClusterConfig() const { @@ -5754,12 +5772,35 @@ void Context::stopServers(const ServerType & server_type) const shared->stop_servers_callback(server_type); } - void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { shared->shutdown(); } +bool Context::stopSwarmMode() +{ + bool expected_is_enabled = true; + bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false); + if (is_stopped_now) + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); + // return true if stop successful + return is_stopped_now; +} + +bool Context::startSwarmMode() +{ + bool expected_is_enabled = false; + bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true); + if (is_started_now) + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1); + // return true if start successful + return is_started_now; +} + +bool Context::isSwarmModeEnabled() const +{ + return shared->swarm_mode_enabled; +} Context::ApplicationType Context::getApplicationType() const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index a7332ae287dc..07121778b869 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1318,6 +1318,8 @@ class Context: public ContextData, public std::enable_shared_from_this size_t getClustersVersion() const; void startClusterDiscovery(); + void registerInAutodiscoveryClusters(); + void unregisterInAutodiscoveryClusters(); /// Sets custom cluster, but doesn't update configuration void setCluster(const String & cluster_name, const std::shared_ptr & cluster); @@ -1433,6 +1435,15 @@ class Context: public ContextData, public std::enable_shared_from_this void shutdown(); + /// Stop some works to allow graceful shutdown later. + /// Returns true if stop successful. + bool stopSwarmMode(); + /// Resume some works if we change our mind. + /// Returns true if start successful. + bool startSwarmMode(); + /// Return current swarm mode state. + bool isSwarmModeEnabled() const; + bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 76a09ca9ff9e..793e3fc0a678 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -702,6 +702,20 @@ BlockIO InterpreterSystemQuery::execute() case Type::START_MOVES: startStopAction(ActionLocks::PartsMove, true); break; + case Type::STOP_SWARM_MODE: + { + getContext()->checkAccess(AccessType::SYSTEM_SWARM); + if (getContext()->stopSwarmMode()) + getContext()->unregisterInAutodiscoveryClusters(); + break; + } + case Type::START_SWARM_MODE: + { + getContext()->checkAccess(AccessType::SYSTEM_SWARM); + if (getContext()->startSwarmMode()) + getContext()->registerInAutodiscoveryClusters(); + break; + } case Type::STOP_FETCHES: startStopAction(ActionLocks::PartsFetch, false); break; @@ -1643,6 +1657,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable()); break; } + case Type::STOP_SWARM_MODE: + case Type::START_SWARM_MODE: + { + required_access.emplace_back(AccessType::SYSTEM_SWARM); + break; + } case Type::STOP_PULLING_REPLICATION_LOG: case Type::START_PULLING_REPLICATION_LOG: { diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 9cdd034f2ca3..12f1e7ac4d71 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -510,6 +510,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_PAGE_CACHE: case Type::STOP_REPLICATED_DDL_QUERIES: case Type::START_REPLICATED_DDL_QUERIES: + case Type::STOP_SWARM_MODE: + case Type::START_SWARM_MODE: break; case Type::UNKNOWN: case Type::END: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index cb21d3d12ba2..3226a93eed63 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -80,6 +80,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster START_FETCHES, STOP_MOVES, START_MOVES, + STOP_SWARM_MODE, + START_SWARM_MODE, STOP_REPLICATED_SENDS, START_REPLICATED_SENDS, STOP_REPLICATION_QUEUES, diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 18b33fc81d6d..1e45649ad699 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -1007,6 +1007,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback) profile_info_callback = std::move(callback); } +bool RemoteQueryExecutor::skipUnavailableShards() const +{ + return context->getSettingsRef()[Setting::skip_unavailable_shards]; +} + bool RemoteQueryExecutor::needToSkipUnavailableShard() const { return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size()); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index e73dc2fc3308..11deb4f2c9b2 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -229,6 +229,8 @@ class RemoteQueryExecutor IConnections & getConnections() { return *connections; } + bool skipUnavailableShards() const; + bool needToSkipUnavailableShard() const; bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 0e3fb4952eb4..08216dffe443 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -16,6 +16,7 @@ namespace ErrorCodes extern const int CANNOT_READ_FROM_SOCKET; extern const int CANNOT_OPEN_FILE; extern const int SOCKET_TIMEOUT; + extern const int ATTEMPT_TO_READ_AFTER_EOF; } RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( @@ -62,16 +63,35 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus { while (true) { - read_context.has_read_packet_part = PacketPart::None; - - if (read_context.read_packet_type_separately) + try + { + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); + } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; + if (read_context.packet.type == Protocol::Server::Data) + read_context.has_data_packets = true; + } + catch (const Exception & e) { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; - suspend_callback(); + /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. + /// If initiator did not process any data packets before, this fact can be ignored. + /// Unprocessed tasks will be executed on other nodes. + if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF + && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) + { + read_context.has_read_packet_part = PacketPart::None; + } + else + throw; } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; + suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index d850244bed6d..82bb28f81264 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -88,6 +88,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor /// None -> Type -> Body -> None /// None -> Body -> None std::atomic has_read_packet_part = PacketPart::None; + std::atomic_bool has_data_packets = false; Packet packet; RemoteQueryExecutor & executor; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 7823fd0a1022..a4ebc59b3647 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -1067,6 +1067,12 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( , is_archive(is_archive_) , object_storage(object_storage_) { + if (!getContext()->isSwarmModeEnabled()) + { + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks"); + return; + } + ThreadPool pool( CurrentMetrics::StorageObjectStorageThreads, CurrentMetrics::StorageObjectStorageThreadsActive, @@ -1102,6 +1108,12 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator ObjectInfoPtr object_info; if (current_index >= buffer.size()) { + if (!getContext()->isSwarmModeEnabled()) + { + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks"); + return nullptr; + } + auto task = callback(); if (!task || task->isEmpty()) return nullptr; diff --git a/tests/integration/test_s3_cluster/data/graceful/part0.csv b/tests/integration/test_s3_cluster/data/graceful/part0.csv new file mode 100644 index 000000000000..2a8ceabbea58 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part0.csv @@ -0,0 +1 @@ +0,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part1.csv b/tests/integration/test_s3_cluster/data/graceful/part1.csv new file mode 100644 index 000000000000..1950012fffd2 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part1.csv @@ -0,0 +1 @@ +1,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part2.csv b/tests/integration/test_s3_cluster/data/graceful/part2.csv new file mode 100644 index 000000000000..dc782d5adf9b --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part2.csv @@ -0,0 +1 @@ +2,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part3.csv b/tests/integration/test_s3_cluster/data/graceful/part3.csv new file mode 100644 index 000000000000..6e581549d23c --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part3.csv @@ -0,0 +1 @@ +3,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part4.csv b/tests/integration/test_s3_cluster/data/graceful/part4.csv new file mode 100644 index 000000000000..bb5a4d956c51 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part4.csv @@ -0,0 +1 @@ +4,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part5.csv b/tests/integration/test_s3_cluster/data/graceful/part5.csv new file mode 100644 index 000000000000..5cb2c6be144b --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part5.csv @@ -0,0 +1 @@ +5,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part6.csv b/tests/integration/test_s3_cluster/data/graceful/part6.csv new file mode 100644 index 000000000000..e2e2428d100d --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part6.csv @@ -0,0 +1 @@ +6,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part7.csv b/tests/integration/test_s3_cluster/data/graceful/part7.csv new file mode 100644 index 000000000000..3c819a315c20 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part7.csv @@ -0,0 +1 @@ +7,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part8.csv b/tests/integration/test_s3_cluster/data/graceful/part8.csv new file mode 100644 index 000000000000..72f39e512be3 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part8.csv @@ -0,0 +1 @@ +8,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part9.csv b/tests/integration/test_s3_cluster/data/graceful/part9.csv new file mode 100644 index 000000000000..f288cb2051dd --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part9.csv @@ -0,0 +1 @@ +9,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partA.csv b/tests/integration/test_s3_cluster/data/graceful/partA.csv new file mode 100644 index 000000000000..da99f68ba784 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partA.csv @@ -0,0 +1 @@ +10,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partB.csv b/tests/integration/test_s3_cluster/data/graceful/partB.csv new file mode 100644 index 000000000000..46591e0be815 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partB.csv @@ -0,0 +1 @@ +11,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partC.csv b/tests/integration/test_s3_cluster/data/graceful/partC.csv new file mode 100644 index 000000000000..24af8010b5c6 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partC.csv @@ -0,0 +1 @@ +12,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partD.csv b/tests/integration/test_s3_cluster/data/graceful/partD.csv new file mode 100644 index 000000000000..0365a5024871 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partD.csv @@ -0,0 +1 @@ +13,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partE.csv b/tests/integration/test_s3_cluster/data/graceful/partE.csv new file mode 100644 index 000000000000..3143c0eed915 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partE.csv @@ -0,0 +1 @@ +14,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partF.csv b/tests/integration/test_s3_cluster/data/graceful/partF.csv new file mode 100644 index 000000000000..d0306b9bb806 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partF.csv @@ -0,0 +1 @@ +15,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 76b8f0df2881..d95d109031d6 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -2,12 +2,15 @@ import logging import os import shutil +import uuid import time from email.errors import HeaderParseError +import threading import pytest from helpers.cluster import ClickHouseCluster +from helpers.client import QueryRuntimeException from helpers.config_cluster import minio_secret_key from helpers.mock_servers import start_mock_servers from helpers.test_tools import TSV @@ -21,6 +24,22 @@ "data/clickhouse/part123.csv", "data/database/part2.csv", "data/database/partition675.csv", + "data/graceful/part0.csv", + "data/graceful/part1.csv", + "data/graceful/part2.csv", + "data/graceful/part3.csv", + "data/graceful/part4.csv", + "data/graceful/part5.csv", + "data/graceful/part6.csv", + "data/graceful/part7.csv", + "data/graceful/part8.csv", + "data/graceful/part9.csv", + "data/graceful/partA.csv", + "data/graceful/partB.csv", + "data/graceful/partC.csv", + "data/graceful/partD.csv", + "data/graceful/partE.csv", + "data/graceful/partF.csv", ] @@ -76,6 +95,7 @@ def started_cluster(): macros={"replica": "node1", "shard": "shard1"}, with_minio=True, with_zookeeper=True, + stay_alive=True, ) cluster.add_instance( "s0_0_1", @@ -83,6 +103,7 @@ def started_cluster(): user_configs=["configs/users.xml"], macros={"replica": "replica2", "shard": "shard1"}, with_zookeeper=True, + stay_alive=True, ) cluster.add_instance( "s0_1_0", @@ -90,6 +111,7 @@ def started_cluster(): user_configs=["configs/users.xml"], macros={"replica": "replica1", "shard": "shard2"}, with_zookeeper=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -509,3 +531,63 @@ def test_cluster_default_expression(started_cluster): ) assert result == expected_result + + +def test_graceful_shutdown(started_cluster): + node = started_cluster.instances["s0_0_0"] + node_to_shutdown = started_cluster.instances["s0_1_0"] + + expected = TSV("64\tBar\t8\n56\tFoo\t8\n") + + num_lock = threading.Lock() + errors = 0 + + def query_cycle(): + nonlocal errors + try: + i = 0 + while i < 10: + i += 1 + # Query time 3-4 seconds + # Processing single object 1-2 seconds + result = node.query(f""" + SELECT sum(value),name,sum(sleep(1)+1) as sleep FROM s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/graceful/*', 'minio', '{minio_secret_key}', 'CSV', + 'value UInt32, name String') + GROUP BY name + ORDER BY name + SETTINGS max_threads=2 + """) + with num_lock: + if TSV(result) != expected: + errors += 1 + if errors >= 1: + break + except QueryRuntimeException: + with num_lock: + errors += 1 + + threads = [] + + for _ in range(10): + thread = threading.Thread(target=query_cycle) + thread.start() + threads.append(thread) + time.sleep(0.2) + + time.sleep(3) + + node_to_shutdown.query("SYSTEM STOP SWARM MODE") + + # enough time to complete processing of objects, started before "SYSTEM STOP SWARM MODE" + time.sleep(3) + + node_to_shutdown.stop_clickhouse(kill=True) + + for thread in threads: + thread.join() + + node_to_shutdown.start_clickhouse() + + assert errors == 0 diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 716d0cd00634..c1278b6acda8 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -147,6 +147,7 @@ SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START M SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM +SYSTEM SWARM ['SYSTEM STOP SWARM MODE','SYSTEM START SWARM MODE','STOP SWARM MODE','START SWARM MODE'] GLOBAL SYSTEM SYSTEM PULLING REPLICATION LOG ['SYSTEM STOP PULLING REPLICATION LOG','SYSTEM START PULLING REPLICATION LOG'] TABLE SYSTEM SYSTEM CLEANUP ['SYSTEM STOP CLEANUP','SYSTEM START CLEANUP'] TABLE SYSTEM SYSTEM VIEWS ['SYSTEM REFRESH VIEW','SYSTEM START VIEWS','SYSTEM STOP VIEWS','SYSTEM START VIEW','SYSTEM STOP VIEW','SYSTEM CANCEL VIEW','REFRESH VIEW','START VIEWS','STOP VIEWS','START VIEW','STOP VIEW','CANCEL VIEW'] VIEW SYSTEM From 02517d71924ee12ce91685fcbacbabafb384760e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 17 Sep 2025 17:09:41 +0200 Subject: [PATCH 5/7] Fix for restart on connection lost --- src/QueryPipeline/RemoteQueryExecutorReadContext.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 08216dffe443..bd9c0f4966e4 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -101,6 +101,7 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus throw; read_context.packet.type = Protocol::Server::ConnectionLost; read_context.packet.exception = std::make_unique(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); + read_context.has_read_packet_part = PacketPart::Body; suspend_callback(); } } From 4c07216b859c6eff1dab1029f7e89f00abd36c4f Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 10 Oct 2025 01:12:25 +0200 Subject: [PATCH 6/7] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9cfca7435db0..71e958f70490 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -275,8 +275,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_replicas_for_cluster_engines", false, true, "New setting."}, {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 - {"use_object_storage_list_objects_cache", true, false, "New setting."}, - {"allow_retries_in_cluster_requests", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.2", { From 7cd664e799896bdc9f47958a85ad85fa0773031b Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 10 Oct 2025 11:21:41 +0200 Subject: [PATCH 7/7] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 71e958f70490..923dddf1288e 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -44,6 +44,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, + {"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"}, }); addSettingsChanges(settings_changes_history, "25.8", {