@@ -732,9 +732,9 @@ void Cluster::initMisc()
732
732
}
733
733
}
734
734
735
- std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards (const Settings & settings, size_t max_replicas_from_shard) const
735
+ std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards (const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts ) const
736
736
{
737
- return std::unique_ptr<Cluster>{ new Cluster (ReplicasAsShardsTag{}, *this , settings, max_replicas_from_shard)};
737
+ return std::unique_ptr<Cluster>{ new Cluster (ReplicasAsShardsTag{}, *this , settings, max_replicas_from_shard, max_hosts )};
738
738
}
739
739
740
740
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard (size_t index) const
@@ -783,7 +783,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &
783
783
784
784
}
785
785
786
- Cluster::Cluster (Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
786
+ Cluster::Cluster (Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts )
787
787
{
788
788
if (from.addresses_with_failover .empty ())
789
789
throw Exception (ErrorCodes::LOGICAL_ERROR, " Cluster is empty" );
@@ -805,6 +805,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
805
805
806
806
if (address.is_local )
807
807
info.local_addresses .push_back (address);
808
+ addresses_with_failover.emplace_back (Addresses ({address}));
808
809
809
810
auto pool = ConnectionPoolFactory::instance ().get (
810
811
static_cast <unsigned >(settings[Setting::distributed_connections_pool_size]),
@@ -828,9 +829,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
828
829
info.per_replica_pools = {std::move (pool)};
829
830
info.default_database = address.default_database ;
830
831
831
- addresses_with_failover.emplace_back (Addresses{address});
832
-
833
- slot_to_shard.insert (std::end (slot_to_shard), info.weight , shards_info.size ());
834
832
shards_info.emplace_back (std::move (info));
835
833
}
836
834
};
@@ -852,10 +850,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
852
850
secret = from.secret ;
853
851
name = from.name ;
854
852
853
+ constrainShardInfoAndAddressesToMaxHosts (max_hosts);
854
+
855
+ for (size_t i = 0 ; i < shards_info.size (); ++i)
856
+ slot_to_shard.insert (std::end (slot_to_shard), shards_info[i].weight , i);
857
+
855
858
initMisc ();
856
859
}
857
860
858
861
862
+ void Cluster::constrainShardInfoAndAddressesToMaxHosts (size_t max_hosts)
863
+ {
864
+ if (max_hosts == 0 || shards_info.size () <= max_hosts)
865
+ return ;
866
+
867
+ pcg64_fast gen{randomSeed ()};
868
+ std::shuffle (shards_info.begin (), shards_info.end (), gen);
869
+ shards_info.resize (max_hosts);
870
+
871
+ AddressesWithFailover addresses_with_failover_;
872
+
873
+ UInt32 shard_num = 0 ;
874
+ for (auto & shard_info : shards_info)
875
+ {
876
+ addresses_with_failover_.push_back (addresses_with_failover[shard_info.shard_num - 1 ]);
877
+ shard_info.shard_num = ++shard_num;
878
+ }
879
+
880
+ addresses_with_failover.swap (addresses_with_failover_);
881
+ }
882
+
883
+
859
884
Cluster::Cluster (Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t > & indices)
860
885
{
861
886
for (size_t index : indices)
0 commit comments