Skip to content

Commit 0693e71

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #952 from Altinity/feature/antalya-25.6.5/rendezvous_hashing
25.6.5 Antalya port of #709, #760, #866 - Rendezvous hashing
1 parent 61c1d5c commit 0693e71

14 files changed

+529
-27
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7001,6 +7001,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
70017001
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
70027002
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
70037003
)", 0) \
7004+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
7005+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
7006+
)", EXPERIMENTAL) \
70047007
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
70057008
Force specified kind of Exchange operators between distributed query stages.
70067009

src/Core/SettingsChangesHistory.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
132132
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
133133
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
134134
/// RELEASE CLOSED
135+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
136+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
137+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
138+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
139+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
140+
{"object_storage_cluster", "", "", "New setting"},
141+
{"object_storage_max_nodes", 0, 0, "New setting"},
135142
});
136143
addSettingsChanges(settings_changes_history, "25.6",
137144
{

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -104,4 +108,36 @@ std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
104108
return getPath();
105109
}
106110

111+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
112+
{
113+
Poco::JSON::Parser parser;
114+
try
115+
{
116+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
117+
if (!json)
118+
return;
119+
120+
successfully_parsed = true;
121+
122+
if (json->has("retry_after_us"))
123+
retry_after_us = json->getValue<size_t>("retry_after_us");
124+
}
125+
catch (const Poco::JSON::JSONException &)
126+
{ /// Not a JSON
127+
return;
128+
}
129+
}
130+
131+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
132+
{
133+
Poco::JSON::Object json;
134+
if (retry_after_us.has_value())
135+
json.set("retry_after_us", retry_after_us.value());
136+
137+
std::ostringstream oss;
138+
oss.exceptions(std::ios::failbit);
139+
Poco::JSON::Stringifier::stringify(json, oss);
140+
return oss.str();
141+
}
142+
107143
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,41 @@ struct DataLakeObjectMetadata;
111111

112112
struct RelativePathWithMetadata
113113
{
114+
class CommandInTaskResponse
115+
{
116+
public:
117+
CommandInTaskResponse() = default;
118+
explicit CommandInTaskResponse(const std::string & task);
119+
120+
bool is_parsed() const { return successfully_parsed; }
121+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
122+
123+
std::string to_string() const;
124+
125+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
126+
127+
private:
128+
bool successfully_parsed = false;
129+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
130+
};
131+
114132
String relative_path;
115133
/// Object metadata: size, modification time, etc.
116134
std::optional<ObjectMetadata> metadata;
117135
/// Delta lake related object metadata.
118136
std::optional<DataLakeObjectMetadata> data_lake_metadata;
137+
/// Retry request after short pause
138+
CommandInTaskResponse command;
119139

120140
RelativePathWithMetadata() = default;
121141

122-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
123-
: relative_path(std::move(relative_path_))
124-
, metadata(std::move(metadata_))
125-
{}
142+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
143+
: metadata(std::move(metadata_))
144+
, command(task_string)
145+
{
146+
if (!command.is_parsed())
147+
relative_path = task_string;
148+
}
126149

127150
RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;
128151

@@ -134,6 +157,8 @@ struct RelativePathWithMetadata
134157
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
135158
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
136159
virtual std::string getPathOrPathToArchiveIfArchive() const;
160+
161+
const CommandInTaskResponse & getCommand() const { return command; }
137162
};
138163

139164
struct ObjectKeyWithMetadata

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ namespace Setting
2424
{
2525
extern const SettingsBool use_hive_partitioning;
2626
extern const SettingsBool cluster_function_process_archive_on_multiple_nodes;
27+
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
2728
}
2829

2930
namespace ErrorCodes
3031
{
3132
extern const int LOGICAL_ERROR;
33+
extern const int INVALID_SETTING_VALUE;
3234
}
3335

3436
String StorageObjectStorageCluster::getPathSample(ContextPtr context)
@@ -224,10 +226,23 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
224226
}
225227
}
226228

229+
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];
230+
231+
/// Check value to avoid negative result after conversion in microseconds.
232+
/// Poco::Timestamp::TimeDiff is signed int 64.
233+
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
234+
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
235+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
236+
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
237+
lock_object_storage_task_distribution_ms,
238+
lock_object_storage_task_distribution_ms_max
239+
);
240+
227241
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
228242
iterator,
229243
std::move(ids_of_hosts),
230-
/* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]);
244+
/* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes],
245+
lock_object_storage_task_distribution_ms);
231246

232247
auto callback = std::make_shared<TaskIterator>(
233248
[task_distributor, local_context](size_t number_of_current_replica) mutable -> ClusterFunctionReadTaskResponsePtr

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2828
#include <Storages/ObjectStorage/StorageObjectStorage.h>
2929
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
30+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
3031
#include <Storages/ObjectStorage/Utils.h>
3132
#include <Storages/VirtualColumnUtils.h>
3233
#include <Common/SipHash.h>
@@ -445,11 +446,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
445446
ObjectInfoPtr object_info;
446447
auto query_settings = configuration->getQuerySettings(context_);
447448

449+
bool not_a_path = false;
450+
448451
do
449452
{
453+
not_a_path = false;
450454
object_info = file_iterator->next(processor);
451455

452-
if (!object_info || object_info->getPath().empty())
456+
if (!object_info)
457+
return {};
458+
459+
if (object_info->getCommand().is_parsed())
460+
{
461+
auto retry_after_us = object_info->getCommand().get_retry_after_us();
462+
if (retry_after_us.has_value())
463+
{
464+
not_a_path = true;
465+
/// TODO: Make asyncronous waiting without sleep in thread
466+
/// Now this sleep is on executor node in worker thread
467+
/// Does not block query initiator
468+
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
469+
continue;
470+
}
471+
}
472+
473+
if (object_info->getPath().empty())
453474
return {};
454475

455476
if (!object_info->metadata)
@@ -468,7 +489,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
468489
object_info->metadata = object_storage->getObjectMetadata(path);
469490
}
470491
}
471-
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
492+
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
472493

473494
QueryPipelineBuilder builder;
474495
std::shared_ptr<ISource> source;

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ namespace ErrorCodes
1414
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1515
std::shared_ptr<IObjectIterator> iterator_,
1616
std::vector<std::string> && ids_of_nodes_,
17-
bool send_over_whole_archive_)
17+
bool send_over_whole_archive_,
18+
uint64_t lock_object_storage_task_distribution_ms_)
1819
: iterator(std::move(iterator_))
1920
, send_over_whole_archive(send_over_whole_archive_)
2021
, connection_to_files(ids_of_nodes_.size())
2122
, ids_of_nodes(std::move(ids_of_nodes_))
23+
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
2224
, iterator_exhausted(false)
2325
{
2426
}
@@ -27,6 +29,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
2729
{
2830
LOG_TRACE(log, "Received request from replica {} looking for a file", number_of_current_replica);
2931

32+
saveLastNodeActivity(number_of_current_replica);
33+
3034
// 1. Check pre-queued files first
3135
if (auto file = getPreQueuedFile(number_of_current_replica))
3236
return file;
@@ -159,7 +163,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
159163
// Queue file for its assigned replica
160164
{
161165
std::lock_guard lock(mutex);
162-
unprocessed_files.emplace(file_path, object_info);
166+
unprocessed_files.emplace(file_path, std::make_pair(object_info, number_of_current_replica));
163167
connection_to_files[file_replica_idx].push_back(object_info);
164168
}
165169
}
@@ -169,26 +173,65 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
169173

170174
ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
171175
{
176+
/// Limit time of node activity to keep task in queue
177+
Poco::Timestamp activity_limit;
178+
Poco::Timestamp oldest_activity;
179+
if (lock_object_storage_task_distribution_us > 0)
180+
activity_limit -= lock_object_storage_task_distribution_us;
181+
172182
std::lock_guard lock(mutex);
173183

174184
if (!unprocessed_files.empty())
175185
{
176186
auto it = unprocessed_files.begin();
177-
auto next_file = it->second;
178-
unprocessed_files.erase(it);
179187

180-
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
188+
while (it != unprocessed_files.end())
189+
{
190+
auto last_activity = last_node_activity.find(it->second.second);
191+
if (lock_object_storage_task_distribution_us <= 0
192+
|| last_activity == last_node_activity.end()
193+
|| activity_limit > last_activity->second)
194+
{
195+
auto next_file = it->second.first;
196+
unprocessed_files.erase(it);
197+
198+
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
199+
LOG_TRACE(
200+
log,
201+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
202+
file_path,
203+
number_of_current_replica
204+
);
205+
206+
return next_file;
207+
}
208+
209+
oldest_activity = std::min(oldest_activity, last_activity->second);
210+
++it;
211+
}
212+
181213
LOG_TRACE(
182214
log,
183-
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
184-
file_path,
185-
number_of_current_replica
215+
"No unprocessed file for replica {}, need to retry after {} us",
216+
number_of_current_replica,
217+
oldest_activity - activity_limit
186218
);
187219

188-
return next_file;
220+
/// All unprocessed files owned by alive replicas with recenlty activity
221+
/// Need to retry after (oldest_activity - activity_limit) microseconds
222+
RelativePathWithMetadata::CommandInTaskResponse response;
223+
response.set_retry_after_us(oldest_activity - activity_limit);
224+
return std::make_shared<ObjectInfo>(response.to_string());
189225
}
190226

191227
return {};
192228
}
193229

230+
void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
231+
{
232+
Poco::Timestamp now;
233+
std::lock_guard lock(mutex);
234+
last_node_activity[number_of_current_replica] = now;
235+
}
236+
194237
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
#include <Common/Logger.h>
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
9+
#include <Poco/Timestamp.h>
10+
711
#include <unordered_set>
12+
#include <unordered_map>
813
#include <vector>
914
#include <mutex>
1015
#include <memory>
@@ -18,7 +23,8 @@ class StorageObjectStorageStableTaskDistributor
1823
StorageObjectStorageStableTaskDistributor(
1924
std::shared_ptr<IObjectIterator> iterator_,
2025
std::vector<std::string> && ids_of_nodes_,
21-
bool send_over_whole_archive_);
26+
bool send_over_whole_archive_,
27+
uint64_t lock_object_storage_task_distribution_ms_);
2228

2329
ObjectInfoPtr getNextTask(size_t number_of_current_replica);
2430

@@ -28,14 +34,19 @@ class StorageObjectStorageStableTaskDistributor
2834
ObjectInfoPtr getMatchingFileFromIterator(size_t number_of_current_replica);
2935
ObjectInfoPtr getAnyUnprocessedFile(size_t number_of_current_replica);
3036

37+
void saveLastNodeActivity(size_t number_of_current_replica);
38+
3139
const std::shared_ptr<IObjectIterator> iterator;
3240
const bool send_over_whole_archive;
3341

3442
std::vector<std::vector<ObjectInfoPtr>> connection_to_files;
35-
std::unordered_map<std::string, ObjectInfoPtr> unprocessed_files;
43+
std::unordered_map<std::string, std::pair<ObjectInfoPtr, size_t>> unprocessed_files;
3644

3745
std::vector<std::string> ids_of_nodes;
3846

47+
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
48+
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;
49+
3950
std::mutex mutex;
4051
bool iterator_exhausted = false;
4152

0 commit comments

Comments
 (0)