Skip to content

Commit 5a3acf6

Browse files
committed
Add test
1 parent be44667 commit 5a3acf6

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

ydb/library/actors/core/event_pb.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,14 +470,14 @@ namespace NActors {
470470
for (const TRope& rope : payload) {
471471
headerLen += SerializeNumber(rope.size(), temp);
472472
}
473-
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true});
473+
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, false});
474474
for (const TRope& rope : payload) {
475475
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)});
476476
}
477477
}
478478

479479
const size_t byteSize = Max<ssize_t>(0, recordSize) + preserializedSize;
480-
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself
480+
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, false}); // protobuf itself
481481

482482
#ifndef NDEBUG
483483
size_t total = 0;

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,12 @@ namespace NActors {
105105
State = EState::SECTIONS;
106106
SectionIndex = 0;
107107

108-
size_t totalSize = 0;
109-
// It is possible to have event without payload. Such events has only one section.
110-
// We do not send such events via rdma.
111-
bool sendViaRdma = Params.UseRdma && RdmaMemPool && SerializationInfo->Sections.size() > 2;
112-
// Check each section can be send via rdma
108+
bool sendViaRdma = false;
109+
// Check if any section can be send via rdma
113110
for (const auto& section : SerializationInfo->Sections) {
114-
sendViaRdma &= section.IsRdmaCapable;
115-
totalSize += section.Size;
111+
sendViaRdma |= section.IsRdmaCapable;
116112
}
117-
if (sendViaRdma) {
118-
Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type());
113+
if (sendViaRdma && Params.UseRdma && RdmaMemPool) {
119114
NActorsInterconnect::TRdmaCreds rdmaCreds;
120115
if (SerializeEventRdma(event)) {
121116
Chunker.DiscardEvent();

ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,47 @@ TEST_F(XdcRdmaTest, SendRdma) {
317317
UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20));
318318
}
319319

320+
TEST_F(XdcRdmaTest, SendRdmaWithShuffledPayload) {
321+
TTestICCluster cluster(2);
322+
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
323+
auto ev = new TEvTestSerialization();
324+
ev->Record.SetBlobID(123);
325+
ev->Record.SetBuffer("hello world");
326+
for (ui32 i = 0; i < 10; ++i) {
327+
if (i % 2 == 0) {
328+
TRope tmp(TString(5000, 'X'));
329+
ev->AddPayload(std::move(tmp));
330+
} else {
331+
auto buf = memPool->AllocRcBuf(5000, 0).value();
332+
std::fill(buf.GetDataMut(), buf.GetDataMut() + 5000, 'Y');
333+
ev->AddPayload(TRope(std::move(buf)));
334+
}
335+
}
336+
337+
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
338+
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
339+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
340+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
341+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 10u);
342+
for (ui32 i = 0; i < 10; ++i) {
343+
if (i % 2 == 0) {
344+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u);
345+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'X'));
346+
} else {
347+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u);
348+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'Y'));
349+
}
350+
}
351+
});
352+
const TActorId receiver = cluster.RegisterActor(recieverPtr, 1);
353+
354+
Sleep(TDuration::MilliSeconds(1000));
355+
356+
auto senderPtr = new TSendActor(receiver, ev);
357+
cluster.RegisterActor(senderPtr, 2);
358+
UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20));
359+
}
360+
320361
TEST_F(XdcRdmaTest, SendRdmaWithRegionOffset) {
321362
TTestICCluster cluster(2);
322363
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();

0 commit comments

Comments
 (0)