@@ -717,9 +717,9 @@ void Cluster::initMisc()
717
717
}
718
718
}
719
719
720
- std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards (const Settings & settings, size_t max_replicas_from_shard) const
720
+ std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards (const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts ) const
721
721
{
722
- return std::unique_ptr<Cluster>{ new Cluster (ReplicasAsShardsTag{}, *this , settings, max_replicas_from_shard)};
722
+ return std::unique_ptr<Cluster>{ new Cluster (ReplicasAsShardsTag{}, *this , settings, max_replicas_from_shard, max_hosts )};
723
723
}
724
724
725
725
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard (size_t index) const
@@ -768,7 +768,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &
768
768
769
769
}
770
770
771
- Cluster::Cluster (Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
771
+ Cluster::Cluster (Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts )
772
772
{
773
773
if (from.addresses_with_failover .empty ())
774
774
throw Exception (ErrorCodes::LOGICAL_ERROR, " Cluster is empty" );
@@ -790,6 +790,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
790
790
791
791
if (address.is_local )
792
792
info.local_addresses .push_back (address);
793
+ addresses_with_failover.emplace_back (Addresses ({address}));
793
794
794
795
auto pool = ConnectionPoolFactory::instance ().get (
795
796
static_cast <unsigned >(settings[Setting::distributed_connections_pool_size]),
@@ -811,9 +812,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
811
812
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]);
812
813
info.per_replica_pools = {std::move (pool)};
813
814
814
- addresses_with_failover.emplace_back (Addresses{address});
815
-
816
- slot_to_shard.insert (std::end (slot_to_shard), info.weight , shards_info.size ());
817
815
shards_info.emplace_back (std::move (info));
818
816
}
819
817
};
@@ -835,10 +833,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
835
833
secret = from.secret ;
836
834
name = from.name ;
837
835
836
+ constrainShardInfoAndAddressesToMaxHosts (max_hosts);
837
+
838
+ for (size_t i = 0 ; i < shards_info.size (); ++i)
839
+ slot_to_shard.insert (std::end (slot_to_shard), shards_info[i].weight , i);
840
+
838
841
initMisc ();
839
842
}
840
843
841
844
845
+ void Cluster::constrainShardInfoAndAddressesToMaxHosts (size_t max_hosts)
846
+ {
847
+ if (max_hosts == 0 || shards_info.size () <= max_hosts)
848
+ return ;
849
+
850
+ pcg64_fast gen{randomSeed ()};
851
+ std::shuffle (shards_info.begin (), shards_info.end (), gen);
852
+ shards_info.resize (max_hosts);
853
+
854
+ AddressesWithFailover addresses_with_failover_;
855
+
856
+ UInt32 shard_num = 0 ;
857
+ for (auto & shard_info : shards_info)
858
+ {
859
+ addresses_with_failover_.push_back (addresses_with_failover[shard_info.shard_num - 1 ]);
860
+ shard_info.shard_num = ++shard_num;
861
+ }
862
+
863
+ addresses_with_failover.swap (addresses_with_failover_);
864
+ }
865
+
866
+
842
867
Cluster::Cluster (Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t > & indices)
843
868
{
844
869
for (size_t index : indices)
0 commit comments