Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/sql-reference/statements/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]

Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)

## PRESHUTDOWN {#preshutdown}

<CloudNotSupportedBadge/>

Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).

## KILL {#kill}

Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
Expand Down
4 changes: 4 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,8 @@ try

}

global_context->startSwarmMode();

{
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more important shutdown after) tables.
Expand Down Expand Up @@ -2777,6 +2779,8 @@ try

is_cancelled = true;

global_context->stopSwarmMode();

LOG_DEBUG(log, "Waiting for current connections to close.");

size_t current_connections = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ enum class AccessType : uint8_t
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
\
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
\
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ namespace Protocol
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
SSHChallenge = 18, /// Return challenge for SSH signature signing

MAX = SSHChallenge,

ConnectionLost = 255, /// Exception that occurred on the client side.
};

/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
Expand Down
16 changes: 16 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7010,6 +7010,19 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
)", 0) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.

Possible values:

- 0 - steal tasks immediately after freeing up.
- >0 - wait for specified period of time before stealing tasks.

Having this `>0` helps with cache reuse and might improve overall query time.
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
Which might take longer than just waiting for the busy node and generate extra traffic.
)", EXPERIMENTAL) \
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
Force specified kind of Exchange operators between distributed query stages.

Expand All @@ -7033,6 +7046,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
)", EXPERIMENTAL) \
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
Expand Down
8 changes: 8 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.8.9.2000",
{
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
});
Expand Down Expand Up @@ -137,6 +138,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
/// RELEASE CLOSED
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.6.5.2000",
{
Expand Down
36 changes: 36 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>


namespace DB
{
Expand Down Expand Up @@ -104,4 +108,36 @@ std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
return getPath();
}

RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

successfully_parsed = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
33 changes: 29 additions & 4 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,41 @@ struct DataLakeObjectMetadata;

struct RelativePathWithMetadata
{
class CommandInTaskResponse
{
public:
CommandInTaskResponse() = default;
explicit CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }

std::string to_string() const;

std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }

private:
bool successfully_parsed = false;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
};

String relative_path;
/// Object metadata: size, modification time, etc.
std::optional<ObjectMetadata> metadata;
/// Delta lake related object metadata.
std::optional<DataLakeObjectMetadata> data_lake_metadata;
/// Retry request after short pause
CommandInTaskResponse command;

RelativePathWithMetadata() = default;

explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: relative_path(std::move(relative_path_))
, metadata(std::move(metadata_))
{}
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: metadata(std::move(metadata_))
, command(task_string)
{
if (!command.is_parsed())
relative_path = task_string;
}

RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;

Expand All @@ -134,6 +157,8 @@ struct RelativePathWithMetadata
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual std::string getPathOrPathToArchiveIfArchive() const;

const CommandInTaskResponse & getCommand() const { return command; }
};

struct ObjectKeyWithMetadata
Expand Down
62 changes: 61 additions & 1 deletion src/Interpreters/ClusterDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
cv.notify_one();
}

void wakeup()
{
std::unique_lock<std::mutex> lk(mu);
any_need_update = true;
cv.notify_one();
}

private:
std::condition_variable cv;
std::mutex mu;
Expand Down Expand Up @@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
return true;
};

if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
if (!cluster_info.current_node_is_observer
&& context->isSwarmModeEnabled()
&& !contains(node_uuids, current_node_name))
{
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
registerInZk(zk, cluster_info);
Expand Down Expand Up @@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
return;
}

if (!context->isSwarmModeEnabled())
{
LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
return;
}

LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);

zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
{
if (info.current_node_is_observer)
return;

String node_path = getShardsListPath(info.zk_root) / current_node_name;
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);

zk->remove(node_path);
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::initialUpdate()
{
LOG_DEBUG(log, "Initializing");
Expand Down Expand Up @@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
is_initialized = true;
}

void ClusterDiscovery::registerAll()
{
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
clusters_to_update->wakeup();
}

void ClusterDiscovery::unregisterAll()
{
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
clusters_to_update->wakeup();
}

void ClusterDiscovery::findDynamicClusters(
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
std::unordered_set<size_t> * unchanged_roots)
Expand Down Expand Up @@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
{
up_to_date_callback();
}

RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);

if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
{
LOG_DEBUG(log, "Register in all dynamic clusters");
for (auto & [_, info] : clusters_info)
{
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
registerInZk(zk, info);
}
}
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
{
LOG_DEBUG(log, "Unregister in all dynamic clusters");
for (auto & [_, info] : clusters_info)
{
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
unregisterFromZk(zk, info);
}
}
}
LOG_DEBUG(log, "Worker thread stopped");
return finished;
Expand Down
13 changes: 13 additions & 0 deletions src/Interpreters/ClusterDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class ClusterDiscovery

~ClusterDiscovery();

void registerAll();
void unregisterAll();

private:
struct NodeInfo
{
Expand Down Expand Up @@ -125,6 +128,7 @@ class ClusterDiscovery
void initialUpdate();

void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);

Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
Expand Down Expand Up @@ -207,6 +211,15 @@ class ClusterDiscovery
std::shared_ptr<std::vector<std::shared_ptr<MulticlusterDiscovery>>> multicluster_discovery_paths;

MultiVersion<Macros>::Version macros;

enum RegisterChangeFlag
{
RCF_NONE,
RCF_REGISTER_ALL,
RCF_UNREGISTER_ALL,
};

std::atomic<RegisterChangeFlag> register_change_flag = RegisterChangeFlag::RCF_NONE;
};

}
Loading
Loading