Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_http_request.h>
#include <ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/mind/bscontroller/bsc.h>
#include <ydb/core/util/actorsys_test/testactorsys.h>

#include <ydb/library/pdisk_io/sector_map.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/util/random.h>

#include <google/protobuf/text_format.h>
Expand Down Expand Up @@ -1119,6 +1122,79 @@ Y_UNIT_TEST_SUITE(TBlobStorageWardenTest) {
CheckInferredPDiskSettings(runtime, fakeWhiteboard, fakeNodeWarden,
pdiskId, 12, 2u);
}

void ChangeGroupSizeInUnits(TTestBasicRuntime& runtime, TString poolName, ui32 groupId, ui32 groupSizeInUnits) {
TActorId edge = runtime.AllocateEdgeActor();

auto storagePool = DescribeStoragePool(runtime, poolName);
auto request = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>();
auto& cmd = *request->Record.MutableRequest()->AddCommand()->MutableChangeGroupSizeInUnits();
cmd.SetBoxId(storagePool.GetBoxId());
cmd.SetItemConfigGeneration(storagePool.GetItemConfigGeneration());
cmd.SetStoragePoolId(storagePool.GetStoragePoolId());
cmd.AddGroupId(groupId);
cmd.SetSizeInUnits(groupSizeInUnits);

NTabletPipe::TClientConfig pipeConfig;
pipeConfig.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries();
runtime.SendToPipe(MakeBSControllerID(), edge, request.release(), 0, pipeConfig);

auto reply = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvControllerConfigResponse>(edge);
VERBOSE_COUT("TEvControllerConfigResponse# " << reply->ToString());
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetResponse().GetSuccess(), true);
}

void CheckVDiskStateUpdate(TTestBasicRuntime& runtime, TActorId fakeWhiteboard, ui32 groupId,
ui32 expectedGroupGeneration, ui32 expectedGroupSizeInUnits,
TDuration simTimeout = TDuration::Seconds(10)) {
TInstant deadline = runtime.GetCurrentTime() + simTimeout;
while (true) {
UNIT_ASSERT_LT(runtime.GetCurrentTime(), deadline);

const auto ev = runtime.GrabEdgeEventRethrow<NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate>(fakeWhiteboard, deadline - runtime.GetCurrentTime());
VERBOSE_COUT(" Got TEvVDiskStateUpdate# " << ev->ToString());

NKikimrWhiteboard::TVDiskStateInfo vdiskInfo = ev->Get()->Record;
if (vdiskInfo.GetVDiskId().GetGroupID() != groupId || !vdiskInfo.HasGroupSizeInUnits()) {
continue;
}

UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetVDiskId().GetGroupGeneration(), expectedGroupGeneration);
UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetGroupSizeInUnits(), expectedGroupSizeInUnits);
break;
}
}

CUSTOM_UNIT_TEST(TestEvVGenerationChangeRace) {
TTestBasicRuntime runtime(1, false);
Setup(runtime, "", nullptr);
runtime.SetLogPriority(NKikimrServices::BS_PROXY, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_PROXY_PUT, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_PROXY_BLOCK, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_SKELETON, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_LOCALRECOVERY, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_CONTROLLER, NLog::PRI_INFO);

const ui32 nodeId = runtime.GetNodeId(0);
TActorId fakeWhiteboard = runtime.AllocateEdgeActor();
runtime.RegisterService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(nodeId), fakeWhiteboard);

VERBOSE_COUT(" Starting test");

TBlockEvents<TEvBlobStorage::TEvLocalRecoveryDone> block(runtime);

const TString poolName = "testEvVGenerationChangeRace";
CreateStoragePool(runtime, poolName, "pool-kind-1");
ui32 groupId = GetGroupFromPool(runtime, poolName);

CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u);
ChangeGroupSizeInUnits(runtime, poolName, groupId, 2u);
CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u);
block.Stop().Unblock();
CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 2, 2u);
}

}

} // namespace NBlobStorageNodeWardenTest
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,17 @@ namespace NKikimr {
ApplyHugeBlobSize(Config->MinHugeBlobInBytes);
Y_VERIFY_S(MinHugeBlobInBytes, VCtx->VDiskLogPrefix);

if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
Config->GroupSizeInUnits = GInfo->GroupSizeInUnits;
Y_VERIFY(PDiskCtx);
Y_VERIFY(PDiskCtx->Dsk);
ctx.Send(PDiskCtx->PDiskId,
new NPDisk::TEvYardResize(
PDiskCtx->Dsk->Owner,
PDiskCtx->Dsk->OwnerRound,
Config->GroupSizeInUnits));
}

// handle special case when donor disk starts and finds out that it has been wiped out
if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) {
// send drop donor cmd to NodeWarden
Expand Down Expand Up @@ -2444,10 +2455,11 @@ namespace NKikimr {
GInfo = msg->NewInfo;
SelfVDiskId = msg->NewVDiskId;

if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
if (PDiskCtx && Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
Config->GroupSizeInUnits = GInfo->GroupSizeInUnits;
UpdateWhiteboard(ctx);

Y_VERIFY(PDiskCtx->Dsk);
ctx.Send(PDiskCtx->PDiskId,
new NPDisk::TEvYardResize(
PDiskCtx->Dsk->Owner,
Expand Down
172 changes: 172 additions & 0 deletions ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
import pytest
import time
import logging

from ydb.tests.library.compatibility.fixtures import RestartToAnotherVersionFixture
from ydb.tests.library.compatibility.fixtures import init_stable_binary_path, init_stable_name
from ydb.tests.library.compatibility.fixtures import inter_stable_binary_path, inter_stable_name
from ydb.tests.library.compatibility.fixtures import current_binary_path, current_name
from ydb.tests.library.common.types import Erasure
from ydb.core.protos import blobstorage_config_pb2

logger = logging.getLogger(__name__)

CONST_PDISK_PATH = "SectorMap:TestInferPDiskSettings:480"
CONST_EXPECTED_SLOT_COUNT = 14
CONST_480_GB = 480 * 1024**3
CONST_10_GB = 10 * 1024**3

all_binary_combinations_restart = [
[init_stable_binary_path, inter_stable_binary_path],
[inter_stable_binary_path, current_binary_path],
[init_stable_binary_path, current_binary_path],
]
all_binary_combinations_ids_restart = [
"restart_{}_to_{}".format(init_stable_name, inter_stable_name),
"restart_{}_to_{}".format(inter_stable_name, current_name),
"restart_{}_to_{}".format(init_stable_name, current_name),
]


@pytest.mark.parametrize("base_setup",
argvalues=all_binary_combinations_restart,
ids=all_binary_combinations_ids_restart,
indirect=True)
class TestUpgradeThenRollback(RestartToAnotherVersionFixture):
@pytest.fixture(autouse=True, scope="function")
def setup(self):
cluster_generator = self.setup_cluster(
erasure=Erasure.NONE,
nodes=2,
use_in_memory_pdisks=False)
next(cluster_generator)

host_configs = self.cluster.client.read_host_configs()
for host_config in host_configs:
drive = host_config.Drive.add()
drive.Path = CONST_PDISK_PATH
drive.PDiskConfig.ExpectedSlotCount = CONST_EXPECTED_SLOT_COUNT
self.cluster.client.define_host_configs(host_configs)

yield

def pdisk_list(self):
"""Equivalent to `dstool pdisk list`"""
base_config = self.cluster.client.query_base_config()

# Collect PDisk information
pdisks_info = []
for pdisk in base_config.BaseConfig.PDisk:
if pdisk.Path != CONST_PDISK_PATH:
continue
pdisks_info.append(pdisk)
return pdisks_info

def wait_and_check_pdisk_list(self, check_pdisks_fn, deadline, delay=1):
while True:
pdisks = self.pdisk_list()
try:
check_pdisks_fn(pdisks)
logger.info(f"pdisk_list good: {pdisks}")
return
except AssertionError as e:
if time.time() > deadline:
logger.warning(f"pdisk_list incorrect: {pdisks}")
raise e
else:
time.sleep(delay)

def test(self):
assert self.current_binary_paths_index == 0
logger.info(f"Test started on {self.versions[0]} {time.time()=}")
#################################################################

t1 = time.time()
timeout = 20

def check_pdisks(pdisks):
for pdisk in pdisks:
assert pdisk.Path == CONST_PDISK_PATH
assert pdisk.PDiskConfig.ExpectedSlotCount == CONST_EXPECTED_SLOT_COUNT
assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE
assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB
if self.versions[0] < (25, 3):
assert not pdisk.PDiskMetrics.HasField('SlotCount')
assert not pdisk.PDiskMetrics.HasField('SlotSizeInUnits')
else:
assert pdisk.PDiskMetrics.SlotCount == CONST_EXPECTED_SLOT_COUNT
assert pdisk.PDiskMetrics.HasField('SlotSizeInUnits') and \
pdisk.PDiskMetrics.SlotSizeInUnits == 0
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t1
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < t1 + timeout
self.wait_and_check_pdisk_list(check_pdisks, deadline=t1+timeout)

self.change_cluster_version()
assert self.current_binary_paths_index == 1
logger.info(f"Restarted on version {self.versions[1]} {time.time()=}")
######################################################################

t2 = time.time()
host_configs = self.cluster.client.read_host_configs()
for host_config in host_configs:
drive = host_config.Drive[1]
assert drive.Path == CONST_PDISK_PATH
drive.ClearField('PDiskConfig')
drive.PDiskConfig.SetInParent()
drive.InferPDiskSlotCountFromUnitSize = CONST_10_GB
drive.InferPDiskSlotCountMax = 32
self.cluster.client.define_host_configs(host_configs)
logger.info(f"Inferred PDisk setting applied {time.time()=}")

self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH)
logger.info(f"Drives activated {time.time()=}")

deadline = time.time() + timeout

def check_pdisks(pdisks):
for pdisk in pdisks:
assert pdisk.Path == CONST_PDISK_PATH
assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE
assert not pdisk.HasField('PDiskConfig')
assert pdisk.ExpectedSlotCount == 16 # hardcoded default
assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB
assert pdisk.PDiskMetrics.SlotCount == 24
assert pdisk.PDiskMetrics.SlotSizeInUnits == 2
assert pdisk.InferPDiskSlotCountFromUnitSize == CONST_10_GB
assert pdisk.InferPDiskSlotCountMax == 32
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t2
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < deadline
self.wait_and_check_pdisk_list(check_pdisks, deadline)

t3 = time.time()
self.change_cluster_version()
assert self.current_binary_paths_index == 0
logger.info(f"Restarted back on version {self.versions[0]} {time.time()=}")
###########################################################################

self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH)
logger.info(f"Drives activated {time.time()=}")

deadline = time.time() + timeout

def check_pdisks(pdisks):
for pdisk in pdisks:
assert pdisk.Path == CONST_PDISK_PATH
assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE
assert not pdisk.HasField('PDiskConfig')
assert pdisk.ExpectedSlotCount == 16 # hardcoded default
assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB
if self.versions[0] < (25, 3):
assert not pdisk.PDiskMetrics.HasField('SlotCount')
assert not pdisk.PDiskMetrics.HasField('SlotSizeInUnits')
assert pdisk.InferPDiskSlotCountFromUnitSize == 0
assert pdisk.InferPDiskSlotCountMax == 0
else:
assert pdisk.PDiskMetrics.HasField('SlotCount') and pdisk.PDiskMetrics.SlotCount == 24
assert pdisk.PDiskMetrics.HasField('SlotSizeInUnits') and pdisk.PDiskMetrics.SlotSizeInUnits == 2
assert pdisk.InferPDiskSlotCountFromUnitSize == CONST_10_GB
assert pdisk.InferPDiskSlotCountMax == 32
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t3
assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < deadline
self.wait_and_check_pdisk_list(check_pdisks, deadline)
1 change: 1 addition & 0 deletions ydb/tests/compatibility/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ TEST_SRCS(
test_node_broker_delta_protocol.py
test_table_schema_compatibility.py
test_workload_manager.py
test_infer_pdisk_expected_slot_count.py
udf/test_datetime2.py
udf/test_digest.py
udf/test_digest_regression.py
Expand Down
Loading
Loading