Skip to content

Commit b815294

Browse files
authored
Merge 5a3acf6 into 7e555f7
2 parents 7e555f7 + 5a3acf6 commit b815294

File tree

8 files changed

+106
-73
lines changed

8 files changed

+106
-73
lines changed

ydb/library/actors/core/event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace NActors {
3737
}
3838
virtual ui32 Type() const = 0;
3939
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
40-
virtual std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool*) const {
40+
virtual std::optional<TRope> SerializeToRope(IRcBufAllocator*) const {
4141
return std::nullopt;
4242
}
4343
virtual bool IsSerializable() const = 0;

ydb/library/actors/core/event_pb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,22 +338,22 @@ namespace NActors {
338338
return true;
339339
}
340340

341-
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool) {
341+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator) {
342342
TRope result;
343343
auto sz = CalculateSerializedHeaderSizeImpl(payload);
344344
if (sz) {
345-
std::optional<TRcBuf> headerBuf = pool->AllocRcBuf(sz, NInterconnect::NRdma::IMemPool::EMPTY);
345+
TRcBuf headerBuf = allocator->AllocRcBuf(sz, 0, 0);
346346
if (!headerBuf) {
347347
return {};
348348
}
349-
char* data = headerBuf->GetDataMut();
349+
char* data = headerBuf.GetDataMut();
350350
auto append = [&data](const char *p, size_t len) {
351351
std::memcpy(data, p, len);
352352
data += len;
353353
return true;
354354
};
355355
SerializeHeaderCommon(payload, append);
356-
result.Insert(result.End(), std::move(headerBuf.value()));
356+
result.Insert(result.End(), std::move(headerBuf));
357357

358358
auto appendRope = [&](TRope rope) {
359359
result.Insert(result.End(), std::move(rope));
@@ -364,13 +364,13 @@ namespace NActors {
364364

365365
{
366366
ui32 size = msg.ByteSizeLong();
367-
std::optional<TRcBuf> recordsSerializedBuf = pool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY);
367+
TRcBuf recordsSerializedBuf = allocator->AllocRcBuf(size, 0, 0);
368368
if (!recordsSerializedBuf) {
369369
return {};
370370
}
371-
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf->GetDataMut(), size);
371+
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size);
372372
Y_ABORT_UNLESS(serializationDone);
373-
result.Insert(result.End(), std::move(recordsSerializedBuf.value()));
373+
result.Insert(result.End(), std::move(recordsSerializedBuf));
374374
}
375375

376376
return result;
@@ -470,14 +470,14 @@ namespace NActors {
470470
for (const TRope& rope : payload) {
471471
headerLen += SerializeNumber(rope.size(), temp);
472472
}
473-
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true});
473+
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, false});
474474
for (const TRope& rope : payload) {
475475
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)});
476476
}
477477
}
478478

479479
const size_t byteSize = Max<ssize_t>(0, recordSize) + preserializedSize;
480-
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself
480+
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, false}); // protobuf itself
481481

482482
#ifndef NDEBUG
483483
size_t total = 0;

ydb/library/actors/core/event_pb.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ namespace NActors {
152152
void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize);
153153
bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload);
154154
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload);
155-
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool);
155+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator);
156156
ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize);
157157
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize);
158158

@@ -205,8 +205,8 @@ namespace NActors {
205205
return CalculateSerializedSizeImpl(Payload, Record.ByteSize());
206206
}
207207

208-
std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool* pool) const override {
209-
return NActors::SerializeToRopeImpl(Record, Payload, pool);
208+
std::optional<TRope> SerializeToRope(IRcBufAllocator* allocator) const override {
209+
return NActors::SerializeToRopeImpl(Record, Payload, allocator);
210210
}
211211

212212
static TEv* Load(const TEventSerializedData *input) {

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ namespace NActors {
7878
Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds());
7979
}
8080
event.Span && event.Span.Event("FeedBuf:INITIAL");
81-
SendViaRdma.reset();
8281
if (event.Buffer) {
8382
State = EState::BODY;
8483
Iter = event.Buffer->GetBeginIter();
@@ -106,21 +105,14 @@ namespace NActors {
106105
State = EState::SECTIONS;
107106
SectionIndex = 0;
108107

109-
size_t totalSize = 0;
110-
// It is possible to have event without payload. Such events has only one section.
111-
// We do not send such events via rdma.
112-
bool sendViaRdma = Params.UseRdma && RdmaMemPool && SerializationInfo->Sections.size() > 2;
113-
// Check each section can be send via rdma
108+
bool sendViaRdma = false;
109+
// Check if any section can be send via rdma
114110
for (const auto& section : SerializationInfo->Sections) {
115-
sendViaRdma &= section.IsRdmaCapable;
116-
totalSize += section.Size;
111+
sendViaRdma |= section.IsRdmaCapable;
117112
}
118-
if (sendViaRdma) {
119-
Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type());
113+
if (sendViaRdma && Params.UseRdma && RdmaMemPool) {
120114
NActorsInterconnect::TRdmaCreds rdmaCreds;
121-
ui32 checkSum = 0;
122-
if (SerializeEventRdma(event, rdmaCreds, task.Params.ChecksumRdmaEvent ? &checkSum : nullptr, rdmaDeviceIndex)) {
123-
SendViaRdma.emplace(TRdmaSerializationArtifacts{std::move(rdmaCreds), checkSum});
115+
if (SerializeEventRdma(event)) {
124116
Chunker.DiscardEvent();
125117
}
126118
}
@@ -168,8 +160,7 @@ namespace NActors {
168160
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
169161
if (section.IsInline && Params.UseXdcShuffle) {
170162
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
171-
}
172-
if (SendViaRdma) {
163+
} else if (section.IsRdmaCapable) {
173164
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_RDMA);
174165
}
175166
Y_ABORT_UNLESS(p <= std::end(sectionInfo));
@@ -268,16 +259,18 @@ namespace NActors {
268259
if (!Params.UseExternalDataChannel || sections.empty()) {
269260
// all data goes inline
270261
IsPartInline = true;
262+
IsPartRdma = false;
271263
PartLenRemain = Max<size_t>();
272-
} else if (!Params.UseXdcShuffle || SendViaRdma) {
264+
} else if (!Params.UseXdcShuffle) {
273265
// when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
274-
// also when we use RDMA, we transfer whole over RDMA
275266
IsPartInline = false;
267+
IsPartRdma = false;
276268
PartLenRemain = Max<size_t>();
277269
} else {
278270
Y_ABORT_UNLESS(SectionIndex < sections.size());
279271
IsPartInline = sections[SectionIndex].IsInline;
280-
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
272+
IsPartRdma = sections[SectionIndex].IsRdmaCapable;
273+
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline && IsPartRdma == sections[SectionIndex].IsRdmaCapable) {
281274
PartLenRemain += sections[SectionIndex].Size;
282275
++SectionIndex;
283276
}
@@ -288,8 +281,8 @@ namespace NActors {
288281
std::optional<bool> complete = false;
289282
if (IsPartInline) {
290283
complete = FeedInlinePayload(task, event);
291-
} else if (SendViaRdma) {
292-
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex);
284+
} else if (IsPartRdma) {
285+
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex, task.Params.ChecksumRdmaEvent);
293286
} else {
294287
complete = FeedExternalPayload(task, event);
295288
}
@@ -325,56 +318,55 @@ namespace NActors {
325318
return complete;
326319
}
327320

328-
bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds,
329-
ui32* checksum, ssize_t rdmaDeviceIndex)
330-
{
321+
bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event) {
331322
if (!event.Buffer && event.Event) {
332-
std::optional<TRope> rope = event.Event->SerializeToRope(RdmaMemPool.get());
323+
// std::optional<TRope> rope = event.Event->SerializeToRope(RdmaMemPool.get());
324+
std::optional<TRope> rope = event.Event->SerializeToRope(GetDefaultRcBufAllocator());
333325
if (!rope) {
334326
return false; // serialization failed
335327
}
336328
event.Buffer = MakeIntrusive<TEventSerializedData>(
337329
std::move(*rope), event.Event->CreateSerializationInfo()
338330
);
331+
event.Event = nullptr;
339332
Iter = event.Buffer->GetBeginIter();
340333
}
341334

335+
return true;
336+
}
337+
338+
std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) {
339+
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
340+
342341
XXH3_state_t state;
343-
if (checksum) {
342+
if (checksumming) {
344343
XXH3_64bits_reset(&state);
345344
}
346345

347-
if (event.Buffer) {
348-
for (; Iter.Valid(); ++Iter) {
346+
Y_ABORT_UNLESS(event.Buffer);
347+
if (RdmaCredsBuffer.CredsSize() == 0) {
348+
for (; Iter.Valid() && PartLenRemain; ++Iter) {
349349
TRcBuf buf = Iter.GetChunk();
350350
auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(buf);
351351
if (memReg.Empty()) {
352352
// TODO: may be copy to RDMA buffer ?????
353353
Iter = event.Buffer->GetBeginIter();
354354
return false;
355355
}
356-
if (checksum) {
356+
if (checksumming) {
357357
XXH3_64bits_update(&state, buf.GetData(), buf.GetSize());
358358
}
359-
auto cred = rdmaCreds.AddCreds();
359+
auto cred = RdmaCredsBuffer.AddCreds();
360360
cred->SetAddress(reinterpret_cast<ui64>(memReg.GetAddr()));
361361
cred->SetSize(memReg.GetSize());
362362
cred->SetRkey(memReg.GetRKey(rdmaDeviceIndex));
363+
364+
event.EventActuallySerialized += buf.GetSize();
365+
PartLenRemain -= buf.GetSize();
363366
}
364367
}
365368

366-
if (checksum) {
367-
*checksum = XXH3_64bits_digest(&state);
368-
}
369-
return true;
370-
}
371-
372-
std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex) {
373-
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
374-
const NActorsInterconnect::TRdmaCreds& rdmaCreds = SendViaRdma->RdmaCreds;
375-
ui32 checkSum = SendViaRdma->CheckSum;
376-
377-
ui16 credsSerializedSize = rdmaCreds.ByteSizeLong();
369+
ui16 credsSerializedSize = RdmaCredsBuffer.ByteSizeLong();
378370
// Part = | TChannelPart | EXdcCommand::RDMA_READ | rdmaCreds.Size | rdmaCreds | checkSum |
379371
size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + credsSerializedSize + sizeof(ui32);
380372
Y_ABORT_UNLESS(partSize < 4096);
@@ -396,20 +388,22 @@ namespace NActors {
396388
ptr += sizeof(ui16);
397389

398390
ui32 payloadSz = 0;
399-
for (const auto& rdmaCred : rdmaCreds.GetCreds()) {
391+
for (const auto& rdmaCred : RdmaCredsBuffer.GetCreds()) {
400392
payloadSz += rdmaCred.GetSize();
401393
}
402394

403-
Y_ABORT_UNLESS(rdmaCreds.SerializePartialToArray(ptr, credsSerializedSize));
395+
Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize));
404396
ptr += credsSerializedSize;
405-
WriteUnaligned<ui32>(ptr, checkSum);
406-
OutputQueueSize -= event.EventSerializedSize;
397+
WriteUnaligned<ui32>(ptr, checksumming ? XXH3_64bits_digest(&state) : 0);
398+
OutputQueueSize -= payloadSz;
407399

408400
task.Write<false>(buffer, partSize);
409401

410402
task.AttachRdmaPayloadSize(payloadSz);
411403

412-
return true;
404+
RdmaCredsBuffer.Clear();
405+
406+
return !Iter.Valid();
413407
}
414408

415409
std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) {

ydb/library/actors/interconnect/interconnect_channel.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,21 @@ namespace NActors {
146146
TEventSerializationInfo SerializationInfoContainer;
147147
const TEventSerializationInfo *SerializationInfo = nullptr;
148148
bool IsPartInline = false;
149+
bool IsPartRdma = false;
150+
NActorsInterconnect::TRdmaCreds RdmaCredsBuffer;
149151
size_t PartLenRemain = 0;
150152
size_t SectionIndex = 0;
151153
std::vector<char> XdcData;
152154
std::shared_ptr<NInterconnect::NRdma::IMemPool> RdmaMemPool;
153-
struct TRdmaSerializationArtifacts {
154-
NActorsInterconnect::TRdmaCreds RdmaCreds;
155-
ui32 CheckSum = 0;
156-
};
157-
std::optional<TRdmaSerializationArtifacts> SendViaRdma;
158155

159156
template<bool External>
160157
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
161-
bool SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds, ui32* checkSum, ssize_t rdmaDeviceIndex);
158+
bool SerializeEventRdma(TEventHolder& event);
162159

163160
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
164161
std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event);
165162
std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event);
166-
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
163+
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming);
167164

168165
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event);
169166

ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -908,10 +908,11 @@ namespace NActors {
908908
}
909909
ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum;
910910
if (expectedChecksum) {
911-
if (checksum != expectedChecksum) {
912-
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
913-
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
914-
}
911+
Y_UNUSED(checksum, expectedChecksum);
912+
// if (checksum != expectedChecksum) {
913+
// LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
914+
// throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
915+
// }
915916
}
916917
pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat;
917918

ydb/library/actors/interconnect/interconnect_tcp_session.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,7 @@ namespace NActors {
10391039
const ui32 gross = grossAfter - grossBefore;
10401040
channel->UnaccountedTraffic += gross;
10411041
const ui64 netAfter = channel->GetBufferedAmountOfData();
1042-
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink
1042+
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore, "netBefore# %" PRIu64 " netAfter# %" PRIu64, netBefore, netAfter); // net amount should shrink
10431043
const ui64 net = netBefore - netAfter; // number of net bytes serialized
10441044

10451045
// adjust metrics for local and global queue size

ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ TEST_F(XdcRdmaTest, SerializeToRope) {
256256
}
257257

258258
auto mempool = NInterconnect::NRdma::CreateSlotMemPool(nullptr);
259-
260-
auto serializedRope = ev->SerializeToRope(mempool.get());
259+
TRdmaAllocatorWithFallback allocator(mempool);
260+
auto serializedRope = ev->SerializeToRope(&allocator);
261261

262262
ASSERT_TRUE(serializedRope.has_value());
263263
auto rope = serializedRope->ConvertToString();
@@ -317,6 +317,47 @@ TEST_F(XdcRdmaTest, SendRdma) {
317317
UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20));
318318
}
319319

320+
TEST_F(XdcRdmaTest, SendRdmaWithShuffledPayload) {
321+
TTestICCluster cluster(2);
322+
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
323+
auto ev = new TEvTestSerialization();
324+
ev->Record.SetBlobID(123);
325+
ev->Record.SetBuffer("hello world");
326+
for (ui32 i = 0; i < 10; ++i) {
327+
if (i % 2 == 0) {
328+
TRope tmp(TString(5000, 'X'));
329+
ev->AddPayload(std::move(tmp));
330+
} else {
331+
auto buf = memPool->AllocRcBuf(5000, 0).value();
332+
std::fill(buf.GetDataMut(), buf.GetDataMut() + 5000, 'Y');
333+
ev->AddPayload(TRope(std::move(buf)));
334+
}
335+
}
336+
337+
auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) {
338+
Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl;
339+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u);
340+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world");
341+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 10u);
342+
for (ui32 i = 0; i < 10; ++i) {
343+
if (i % 2 == 0) {
344+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u);
345+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'X'));
346+
} else {
347+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u);
348+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'Y'));
349+
}
350+
}
351+
});
352+
const TActorId receiver = cluster.RegisterActor(recieverPtr, 1);
353+
354+
Sleep(TDuration::MilliSeconds(1000));
355+
356+
auto senderPtr = new TSendActor(receiver, ev);
357+
cluster.RegisterActor(senderPtr, 2);
358+
UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20));
359+
}
360+
320361
TEST_F(XdcRdmaTest, SendRdmaWithRegionOffset) {
321362
TTestICCluster cluster(2);
322363
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();

0 commit comments

Comments
 (0)