Skip to content

Commit 9a4870c

Browse files
authored
Remove LocalPileQuorum (#29310)
1 parent 8bf4163 commit 9a4870c

File tree

5 files changed

+30
-82
lines changed

5 files changed

+30
-82
lines changed

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ namespace NKikimr::NStorage {
144144

145145
QuorumValid = false;
146146

147-
if (BridgeInfo && !BridgeInfo->SelfNodePile->IsPrimary) {
148-
UnbindNodesFromOtherPiles("not primary pile anymore");
149-
}
150-
151147
return true;
152148
} else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() &&
153149
StorageConfig->GetFingerprint() != config.GetFingerprint()) {
@@ -231,24 +227,19 @@ namespace NKikimr::NStorage {
231227

232228
#ifndef NDEBUG
233229
void TDistributedConfigKeeper::ConsistencyCheck() {
230+
THashMap<ui32, TNodeIdentifier> allBoundNodesMap;
231+
for (const auto& [nodeIdentifier, info] : AllBoundNodes) {
232+
const auto [it, inserted] = allBoundNodesMap.emplace(nodeIdentifier.NodeId(), nodeIdentifier);
233+
Y_ABORT_UNLESS(inserted);
234+
}
234235
for (const auto& [nodeId, info] : DirectBoundNodes) { // validate incoming binding
235-
if (std::ranges::binary_search(NodeIdsForIncomingBinding, nodeId) ||
236-
std::ranges::binary_search(NodeIdsForOutgoingBinding, nodeId)) {
237-
continue; // okay
238-
} else if (BridgeInfo && BridgeInfo->SelfNodePile->IsPrimary && !NodesFromSamePile.contains(nodeId) &&
239-
AllNodeIds.contains(nodeId) && (!Binding || !Binding->RootNodeId)) {
240-
continue; // okay too -- other pile connecting to primary
241-
}
242-
Y_ABORT_S("unexpected incoming bound node NodeId# " << nodeId
243-
<< " NodeIdsForIncomingBinding# " << FormatList(NodeIdsForIncomingBinding)
244-
<< " NodeIdsForOutgoingBinding# " << FormatList(NodeIdsForOutgoingBinding)
245-
<< " NodesFromSamePile# " << FormatList(NodesFromSamePile)
246-
<< " Binding# " << (Binding ? Binding->ToString() : "<null>"));
236+
Y_ABORT_UNLESS(allBoundNodesMap.contains(nodeId));
237+
Y_ABORT_UNLESS(AllBoundNodes.contains(allBoundNodesMap[nodeId]));
247238
}
248239
if (Binding) { // validate outgoing binding
249240
Y_ABORT_UNLESS(std::ranges::binary_search(NodeIdsForOutgoingBinding, Binding->NodeId) ||
250241
std::ranges::binary_search(NodeIdsForIncomingBinding, Binding->NodeId) ||
251-
std::ranges::binary_search(NodeIdsForPrimaryPileOutgoingBinding, Binding->NodeId));
242+
std::ranges::binary_search(NodeIdsForOtherPilesOutgoingBinding, Binding->NodeId));
252243
}
253244

254245
for (const auto& [cookie, task] : ScatterTasks) {
@@ -353,8 +344,6 @@ namespace NKikimr::NStorage {
353344

354345
if (IsSelfStatic && StorageConfig && NodeListObtained) {
355346
Y_VERIFY_S(HasConnectedNodeQuorum(*StorageConfig) == GlobalQuorum, "GlobalQuorum# " << GlobalQuorum);
356-
//Y_VERIFY_S((BridgeInfo && HasConnectedNodeQuorum(*StorageConfig, true)) == LocalPileQuorum,
357-
// "LocalPileQuorum# " << LocalPileQuorum);
358347
}
359348

360349
if (Scepter) {

ydb/core/blobstorage/nodewarden/distconf.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ namespace NKikimr::NStorage {
266266
ui64 BindingCookie = RandomNumber<ui64>();
267267
TBindQueue BindQueue;
268268
TBindQueue RevBindQueue;
269-
TBindQueue PrimaryPileBindQueue;
269+
TBindQueue OtherPilesBindQueue;
270270
bool Scheduled = false;
271271

272272
// incoming bindings
@@ -282,7 +282,7 @@ namespace NKikimr::NStorage {
282282
std::deque<TAutoPtr<IEventHandle>> PendingEvents;
283283
std::vector<ui32> NodeIdsForOutgoingBinding;
284284
std::vector<ui32> NodeIdsForIncomingBinding;
285-
std::vector<ui32> NodeIdsForPrimaryPileOutgoingBinding;
285+
std::vector<ui32> NodeIdsForOtherPilesOutgoingBinding;
286286
THashMap<ui32, TNodeIdentifier> AllNodeIds;
287287
THashSet<ui32> NodesFromSamePile;
288288
TNodeIdentifier SelfNode;
@@ -319,7 +319,6 @@ namespace NKikimr::NStorage {
319319
ui64 ScepterCounter = 1; // increased every time Scepter gets changed
320320
TString ErrorReason;
321321
std::optional<TString> CurrentSelfAssemblyUUID;
322-
bool LocalPileQuorum = false;
323322
bool GlobalQuorum = false;
324323
bool QuorumValid = false;
325324

@@ -419,7 +418,6 @@ namespace NKikimr::NStorage {
419418
void HandleWakeup();
420419
void Handle(TEvNodeConfigReversePush::TPtr ev);
421420
void FanOutReversePush(const NKikimrBlobStorage::TStorageConfig *committedStorageConfig);
422-
void UnbindNodesFromOtherPiles(const char *reason);
423421

424422
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
425423
// Binding requests from peer nodes

ydb/core/blobstorage/nodewarden/distconf_binding.cpp

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ namespace NKikimr::NStorage {
6565

6666
std::vector<ui32> nodeIdsForOutgoingBinding;
6767
std::vector<ui32> nodeIdsForIncomingBinding;
68-
std::vector<ui32> nodeIdsForPrimaryPileOutgoingBinding;
68+
std::vector<ui32> nodeIdsForOtherPilesOutgoingBinding;
6969

7070
for (const auto& [item, location] : newNodeList) {
7171
const ui32 nodeId = item.NodeId();
@@ -83,8 +83,8 @@ namespace NKikimr::NStorage {
8383

8484
// check if node is located in primary pile (and this one is not the primary) -- then it is suitable for
8585
// binding to primary pile
86-
if (BridgeInfo && !BridgeInfo->SelfNodePile->IsPrimary && location.GetBridgePileName() == BridgeInfo->PrimaryPile->Name) {
87-
nodeIdsForPrimaryPileOutgoingBinding.push_back(item.NodeId());
86+
if (location.GetBridgePileName() != SelfNodeBridgePileName) {
87+
nodeIdsForOtherPilesOutgoingBinding.push_back(item.NodeId());
8888
}
8989
}
9090

@@ -111,15 +111,15 @@ namespace NKikimr::NStorage {
111111
};
112112
applyChanges(NodeIdsForOutgoingBinding, nodeIdsForOutgoingBinding);
113113
applyChanges(NodeIdsForIncomingBinding, nodeIdsForIncomingBinding);
114-
applyChanges(NodeIdsForPrimaryPileOutgoingBinding, nodeIdsForPrimaryPileOutgoingBinding);
114+
applyChanges(NodeIdsForOtherPilesOutgoingBinding, nodeIdsForOtherPilesOutgoingBinding);
115115
if (bindingReset) {
116116
AbortBinding("node vanished");
117117
}
118118

119119
// issue updates
120120
BindQueue.Update(NodeIdsForOutgoingBinding);
121121
RevBindQueue.Update(NodeIdsForIncomingBinding);
122-
PrimaryPileBindQueue.Update(NodeIdsForPrimaryPileOutgoingBinding);
122+
OtherPilesBindQueue.Update(NodeIdsForOtherPilesOutgoingBinding);
123123
}
124124

125125
TBridgePileId TDistributedConfigKeeper::ResolveNodePileId(const TNodeLocation& location) {
@@ -157,15 +157,13 @@ namespace NKikimr::NStorage {
157157
}
158158

159159
// no node from the same pile available, try to bind to primary pile (if we have quorum)
160-
TMonotonic primaryClosest = TMonotonic::Max();
161-
if (LocalPileQuorum && !BridgeInfo->SelfNodePile->IsPrimary) {
162-
if (std::optional<ui32> nodeIdFromPrimaryPile = PrimaryPileBindQueue.Pick(now, &primaryClosest)) {
163-
return StartBinding(*nodeIdFromPrimaryPile);
164-
}
160+
TMonotonic otherClosest = TMonotonic::Max();
161+
if (std::optional<ui32> nodeIdFromOtherPile = OtherPilesBindQueue.Pick(now, &otherClosest)) {
162+
return StartBinding(*nodeIdFromOtherPile);
165163
}
166164

167165
// nothing to bind to
168-
closest = Min(closest, revClosest, primaryClosest);
166+
closest = Min(closest, revClosest, otherClosest);
169167
if (closest != TMonotonic::Max() && !Scheduled) {
170168
STLOG(PRI_DEBUG, BS_NODE, NWDC30, "Delaying bind");
171169
TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
@@ -488,7 +486,6 @@ namespace NKikimr::NStorage {
488486
// pending commands
489487
if (Binding && Binding->RootNodeId) {
490488
Y_ABORT_UNLESS(Binding->RootNodeId != SelfId().NodeId());
491-
UnbindNodesFromOtherPiles("root node has changed");
492489
std::ranges::for_each(std::exchange(InvokeOnRootPending, {}), std::bind(&TThis::HandleInvokeOnRoot, this,
493490
std::placeholders::_1));
494491
}
@@ -578,7 +575,8 @@ namespace NKikimr::NStorage {
578575
Y_ABORT_UNLESS(StorageConfig); // we must have the storage configuration by the time we can process this message
579576
if (record.GetInitial()) {
580577
for (const auto& item : record.GetBoundNodes()) {
581-
if (item.GetNodeId().GetNodeId() == senderNodeId) { // ensure that whole node identifier tuple matches
578+
const ui32 nodeId = item.GetNodeId().GetNodeId();
579+
if (nodeId == senderNodeId) { // ensure that whole node identifier tuple matches
582580
knownNode = it->second == TNodeIdentifier(item.GetNodeId());
583581
break;
584582
}
@@ -614,21 +612,12 @@ namespace NKikimr::NStorage {
614612
}
615613

616614
// check if this is connection from another pile
617-
if (record.GetInitial() && !NodesFromSamePile.contains(senderNodeId)) {
618-
Y_DEBUG_ABORT_UNLESS(BridgeInfo);
619-
if ((!Binding || !Binding->RootNodeId) && LocalPileQuorum && BridgeInfo->SelfNodePile->IsPrimary) {
620-
// we allow this node's connection as this is the primary pile AND we have majority of connected
621-
// nodes AND this is the root one
622-
} else {
623-
// this is either not the root node, or no quorum for connection
624-
auto response = TEvNodeConfigReversePush::MakeRejected();
625-
if (Binding && Binding->RootNodeId) {
626-
// command peer to join this specific node
627-
response->Record.SetRootNodeId(Binding->RootNodeId);
628-
}
629-
SendEvent(*ev, std::move(response));
630-
return;
631-
}
615+
if (record.GetInitial() && !NodesFromSamePile.contains(senderNodeId) && Binding && Binding->RootNodeId) {
616+
// command peer to join this specific node
617+
auto response = TEvNodeConfigReversePush::MakeRejected();
618+
response->Record.SetRootNodeId(Binding->RootNodeId);
619+
SendEvent(*ev, std::move(response));
620+
return;
632621
}
633622

634623
if (!record.GetInitial() && !DirectBoundNodes.contains(senderNodeId)) {
@@ -792,25 +781,4 @@ namespace NKikimr::NStorage {
792781
return Scepter || (Binding && GetRootNodeId() != SelfId().NodeId());
793782
}
794783

795-
void TDistributedConfigKeeper::UnbindNodesFromOtherPiles(const char *reason) {
796-
if (!BridgeInfo) {
797-
return;
798-
}
799-
800-
std::vector<ui32> goingToUnbind;
801-
for (const auto& [nodeId, info] : DirectBoundNodes) {
802-
if (BridgeInfo->GetPileForNode(nodeId) != BridgeInfo->SelfNodePile) {
803-
auto ev = TEvNodeConfigReversePush::MakeRejected();
804-
if (Binding && Binding->RootNodeId) { // inform about new root, if we have it
805-
ev->Record.SetRootNodeId(Binding->RootNodeId);
806-
}
807-
SendEvent(nodeId, info, std::move(ev));
808-
goingToUnbind.push_back(nodeId);
809-
}
810-
}
811-
for (ui32 nodeId : goingToUnbind) {
812-
UnbindNode(nodeId, reason);
813-
}
814-
}
815-
816784
} // NKikimr::NStorage

ydb/core/blobstorage/nodewarden/distconf_fsm.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@ namespace NKikimr::NStorage {
1616

1717
// recalculate global and local pile quorums
1818
Y_ABORT_UNLESS(StorageConfig);
19-
LocalPileQuorum = BridgeInfo && HasNodeQuorum(*StorageConfig, connected, BridgePileNameMap,
20-
BridgeInfo->SelfNodePile->BridgePileId, *Cfg, nullptr, true);
21-
GlobalQuorum = (!BridgeInfo || BridgeInfo->SelfNodePile->IsPrimary) && HasNodeQuorum(*StorageConfig,
22-
connected, BridgePileNameMap, TBridgePileId(), *Cfg, nullptr, true);
19+
GlobalQuorum = HasNodeQuorum(*StorageConfig, connected, BridgePileNameMap, TBridgePileId(), *Cfg, nullptr, true);
2320

2421
// recalculate unsynced piles' quorum too
2522
if (BridgeInfo) {
@@ -57,10 +54,6 @@ namespace NKikimr::NStorage {
5754
// if we have local pile quorum, then do not switch into error state, we'll start collecting configs locally
5855
SwitchToError("quorum lost");
5956
}
60-
61-
if (!LocalPileQuorum) {
62-
UnbindNodesFromOtherPiles("local pile quorum lost");
63-
}
6457
}
6558

6659
void TDistributedConfigKeeper::HandleRetryCollectConfigsAndPropose(STATEFN_SIG) {

ydb/core/blobstorage/nodewarden/distconf_mon.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ namespace NKikimr::NStorage {
172172
};
173173

174174
NJson::TJsonValue root = NJson::TJsonMap{
175+
{"self_node_id", SelfId().NodeId()},
175176
{"binding", getBinding()},
176177
{"direct_bound_nodes", getDirectBoundNodes()},
177178
{"all_bound_nodes", getAllBoundNodes()},
@@ -278,7 +279,7 @@ namespace NKikimr::NStorage {
278279
out << "Quorum: " << (GlobalQuorum ? "yes" : "no") << "<br/>";
279280
out << "Scepter: " << (Scepter ? ToString(Scepter->Id) : "null") << "<br/>";
280281
out << "NodeIdsForOutgoingBinding: " << FormatList(NodeIdsForOutgoingBinding) << "<br/>";
281-
out << "NodeIdsForPrimaryPileOutgoingBinding: " << FormatList(NodeIdsForPrimaryPileOutgoingBinding) << "<br/>";
282+
out << "NodeIdsForOtherPilesOutgoingBinding: " << FormatList(NodeIdsForOtherPilesOutgoingBinding) << "<br/>";
282283
}
283284
}
284285

@@ -301,7 +302,6 @@ namespace NKikimr::NStorage {
301302
DIV_CLASS("panel-body") {
302303
DIV() {
303304
out << "AllBoundNodes count: " << AllBoundNodes.size() << "<br/>";
304-
out << "LocalPileQuorum: " << (LocalPileQuorum ? "yes" : "no") << "<br/>";
305305
out << "GlobalQuorum: " << (GlobalQuorum ? "yes" : "no") << "<br/>";
306306
out << "NodeIdsForIncomingBinding: " << FormatList(NodeIdsForIncomingBinding) << "<br/>";
307307
out << "ConnectedUnsyncedPiles: " << FormatList(ConnectedUnsyncedPiles) << "<br/>";

0 commit comments

Comments
 (0)