Skip to content

Commit b44cedf

Browse files
committed
Fix without rdma + add test
1 parent 80bdd02 commit b44cedf

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ namespace NActors {
281281
std::optional<bool> complete = false;
282282
if (IsPartInline) {
283283
complete = FeedInlinePayload(task, event);
284-
} else if (IsPartRdma) {
284+
} else if (IsPartRdma && rdmaDeviceIndex >= 0) {
285285
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex, task.Params.ChecksumRdmaEvent);
286286
} else {
287287
complete = FeedExternalPayload(task, event);

ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,17 @@ class TReceiveActor: public TActorBootstrapped<TReceiveActor> {
9292
struct TEventsForTest {
9393
std::vector<TEvTestSerialization*> Events;
9494
std::unordered_map<ui64, std::function<void(TEvTestSerialization*)>> Checks;
95+
NMonitoring::TDynamicCounterPtr Counters;
9596
std::shared_ptr<NInterconnect::NRdma::IMemPool> MemPool;
9697

97-
TEventsForTest(ui32 numEvents)
98-
: MemPool(NInterconnect::NRdma::CreateDummyMemPool())
98+
TEventsForTest(ui32 numEvents, bool shuffle = false)
99+
: Counters(new NMonitoring::TDynamicCounters()),
100+
MemPool(NInterconnect::NRdma::CreateSlotMemPool(Counters.get()))
99101
{
100-
Generate(numEvents, MemPool.get());
102+
Generate(numEvents, MemPool.get(), shuffle);
101103
}
102104

103-
void Generate(ui32 numEvents, NInterconnect::NRdma::IMemPool* memPool) {
105+
void Generate(ui32 numEvents, NInterconnect::NRdma::IMemPool* memPool, bool shuffle = false) {
104106
for (ui32 i = 0; i < numEvents; ++i) {
105107
const bool isInline = i % 3 == 0;
106108
const bool isXdc = i % 3 == 1;
@@ -122,17 +124,27 @@ struct TEventsForTest {
122124
UNIT_ASSERT_VALUES_EQUAL(ev->GetPayload().back().size(), 5000 + j);
123125
}
124126
}
127+
if (shuffle) {
128+
for (ui32 j = 0; j < numPayloads; ++j) {
129+
ev->AddPayload(TRope(TString(10 + j, j + i)));
130+
ev->AddPayload(TRope(TString(5000 + j, j + i)));
131+
auto buf = memPool->AllocRcBuf(5000 + j, 0).value();
132+
std::fill(buf.GetDataMut(), buf.GetDataMut() + 5000 + j, j + i);
133+
ev->AddPayload(TRope(std::move(buf)));
134+
UNIT_ASSERT_VALUES_EQUAL(ev->GetPayload().back().size(), 5000 + j);
135+
}
136+
}
125137

126138
if (isXdc || isRdma) {
127139
UNIT_ASSERT(ev->AllowExternalDataChannel());
128140
}
129141

130142
Events.push_back(ev);
131143

132-
Checks.emplace(i, [i, numPayloads, isInline](TEvTestSerialization* ev) {
144+
Checks.emplace(i, [i, numPayloads, isInline, shuffle](TEvTestSerialization* ev) {
133145
UNIT_ASSERT_VALUES_EQUAL(ev->Record.GetBlobID(), i);
134146
UNIT_ASSERT_VALUES_EQUAL(ev->Record.GetBuffer(), TStringBuilder{} << "hello world " << i);
135-
UNIT_ASSERT_VALUES_EQUAL(ev->GetPayload().size(), numPayloads);
147+
UNIT_ASSERT_VALUES_EQUAL(ev->GetPayload().size(), numPayloads * (shuffle ? 4 : 1));
136148
for (ui32 j = 0; j < numPayloads; ++j) {
137149
ui32 payloadSize = isInline ? 10 + j : 5000 + j;
138150
UNIT_ASSERT_VALUES_EQUAL(ev->GetPayload()[j].GetSize(), payloadSize);
@@ -301,7 +313,6 @@ TEST_F(XdcRdmaTest, SendRdma) {
301313
auto* ev = MakeTestEvent(123, memPool.get());
302314

303315
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
304-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
305316
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
306317
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
307318
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 1u);
@@ -335,7 +346,6 @@ TEST_F(XdcRdmaTest, SendRdmaWithShuffledPayload) {
335346
}
336347

337348
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
338-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
339349
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
340350
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
341351
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 10u);
@@ -364,7 +374,6 @@ TEST_F(XdcRdmaTest, SendRdmaWithRegionOffset) {
364374
auto* ev = MakeTestEvent(123, memPool.get(), false, true);
365375

366376
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
367-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
368377
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
369378
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
370379
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 1u);
@@ -386,7 +395,6 @@ TEST_F(XdcRdmaTest, SendRdmaWithGlueWithRegionOffset) {
386395
auto* ev = MakeTestEvent(123, memPool.get(), true, true);
387396

388397
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
389-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
390398
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
391399
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
392400
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 2u);
@@ -412,7 +420,6 @@ TEST_F(XdcRdmaTest, SendRdmaWithGlue) {
412420
auto* ev = MakeTestEvent(123, memPool.get(), true, false);
413421

414422
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
415-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
416423
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
417424
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
418425
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 2u);
@@ -436,7 +443,6 @@ TEST_F(XdcRdmaTest, SendRdmaWithMultiGlue) {
436443
auto* ev = MakeMultuGlueTestEvent(123, memPool.get());
437444

438445
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
439-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
440446
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
441447
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
442448
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 3u);
@@ -461,7 +467,6 @@ TEST_F(XdcRdmaTest, SendMix) {
461467

462468
ui32 index = 0;
463469
auto recieverPtr = new TReceiveActor([&index](TEvTestSerialization::TPtr ev) {
464-
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
465470
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), index++);
466471
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
467472
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 1u);
@@ -508,3 +513,28 @@ TEST_F(XdcRdmaTest, SendMixBig) {
508513
}
509514
UNIT_ASSERT_VALUES_EQUAL(events.Checks.size(), 0u);
510515
}
516+
517+
TEST_F(XdcRdmaTest, SendMixBigShuffle) {
518+
TTestICCluster cluster(2);
519+
TEventsForTest events(1000, true);
520+
521+
auto recieverPtr = new TReceiveActor([&events](TEvTestSerialization::TPtr ev) {
522+
ui64 blobId = ev->Get()->Record.GetBlobID();
523+
auto checkIt = events.Checks.find(blobId);
524+
UNIT_ASSERT(checkIt != events.Checks.end());
525+
checkIt->second(ev->Get());
526+
events.Checks.erase(checkIt);
527+
});
528+
const TActorId receiver = cluster.RegisterActor(recieverPtr, 1);
529+
Sleep(TDuration::MilliSeconds(1000));
530+
531+
for (auto* ev : events.Events) {
532+
auto senderPtr = new TSendActor(receiver, ev);
533+
cluster.RegisterActor(senderPtr, 2);
534+
}
535+
536+
for (ui32 attempt = 0; attempt < 10 && !events.Checks.empty(); ++attempt) {
537+
Sleep(TDuration::MilliSeconds(1000));
538+
}
539+
UNIT_ASSERT_VALUES_EQUAL(events.Checks.size(), 0u);
540+
}

0 commit comments

Comments
 (0)