Skip to content

Commit c6cb238

Browse files
authored
Merge pull request #750 from Altinity/features/fix_addresses_with_failover
Fix addresses_with_failover in cluster requests
2 parents 00e5ea7 + 7955796 commit c6cb238

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

src/Interpreters/Cluster.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
790790

791791
if (address.is_local)
792792
info.local_addresses.push_back(address);
793+
addresses_with_failover.emplace_back(Addresses({address}));
793794

794795
auto pool = ConnectionPoolFactory::instance().get(
795796
static_cast<unsigned>(settings[Setting::distributed_connections_pool_size]),
@@ -832,27 +833,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
832833
secret = from.secret;
833834
name = from.name;
834835

835-
if (max_hosts > 0 && shards_info.size() > max_hosts)
836-
{
837-
pcg64_fast gen{randomSeed()};
838-
std::shuffle(shards_info.begin(), shards_info.end(), gen);
839-
shards_info.resize(max_hosts);
840-
841-
shard_num = 0;
842-
for (auto & shard_info : shards_info)
843-
shard_info.shard_num = ++shard_num;
844-
}
836+
constrainShardInfoAndAddressesToMaxHosts(max_hosts);
845837

846838
for (size_t i = 0; i < shards_info.size(); ++i)
847-
{
848-
addresses_with_failover.emplace_back(shards_info[i].local_addresses);
849839
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
850-
}
851840

852841
initMisc();
853842
}
854843

855844

845+
void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
846+
{
847+
if (max_hosts == 0 || shards_info.size() <= max_hosts)
848+
return;
849+
850+
pcg64_fast gen{randomSeed()};
851+
std::shuffle(shards_info.begin(), shards_info.end(), gen);
852+
shards_info.resize(max_hosts);
853+
854+
AddressesWithFailover addresses_with_failover_;
855+
856+
UInt32 shard_num = 0;
857+
for (auto & shard_info : shards_info)
858+
{
859+
addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
860+
shard_info.shard_num = ++shard_num;
861+
}
862+
863+
addresses_with_failover.swap(addresses_with_failover_);
864+
}
865+
866+
856867
Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t> & indices)
857868
{
858869
for (size_t index : indices)

src/Interpreters/Cluster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ class Cluster
304304
ShardInfoInsertPathForInternalReplication insert_paths = {},
305305
bool internal_replication = false);
306306

307+
/// Reduce size of cluster to max_hosts
308+
void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);
309+
307310
/// Inter-server secret
308311
String secret;
309312

0 commit comments

Comments
 (0)