Skip to content

Commit 621ae7a

Browse files
authored
Merge 80bdd02 into 6471d14
2 parents 6471d14 + 80bdd02 commit 621ae7a

File tree

9 files changed

+125
-88
lines changed

9 files changed

+125
-88
lines changed

ydb/library/actors/core/event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace NActors {
3737
}
3838
virtual ui32 Type() const = 0;
3939
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
40-
virtual std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool*) const {
40+
virtual std::optional<TRope> SerializeToRope(IRcBufAllocator*) const {
4141
return std::nullopt;
4242
}
4343
virtual bool IsSerializable() const = 0;

ydb/library/actors/core/event_pb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,22 +338,22 @@ namespace NActors {
338338
return true;
339339
}
340340

341-
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool) {
341+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator) {
342342
TRope result;
343343
auto sz = CalculateSerializedHeaderSizeImpl(payload);
344344
if (sz) {
345-
std::optional<TRcBuf> headerBuf = pool->AllocRcBuf(sz, NInterconnect::NRdma::IMemPool::EMPTY);
345+
TRcBuf headerBuf = allocator->AllocRcBuf(sz, 0, 0);
346346
if (!headerBuf) {
347347
return {};
348348
}
349-
char* data = headerBuf->GetDataMut();
349+
char* data = headerBuf.GetDataMut();
350350
auto append = [&data](const char *p, size_t len) {
351351
std::memcpy(data, p, len);
352352
data += len;
353353
return true;
354354
};
355355
SerializeHeaderCommon(payload, append);
356-
result.Insert(result.End(), std::move(headerBuf.value()));
356+
result.Insert(result.End(), std::move(headerBuf));
357357

358358
auto appendRope = [&](TRope rope) {
359359
result.Insert(result.End(), std::move(rope));
@@ -364,13 +364,13 @@ namespace NActors {
364364

365365
{
366366
ui32 size = msg.ByteSizeLong();
367-
std::optional<TRcBuf> recordsSerializedBuf = pool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY);
367+
TRcBuf recordsSerializedBuf = allocator->AllocRcBuf(size, 0, 0);
368368
if (!recordsSerializedBuf) {
369369
return {};
370370
}
371-
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf->GetDataMut(), size);
371+
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size);
372372
Y_ABORT_UNLESS(serializationDone);
373-
result.Insert(result.End(), std::move(recordsSerializedBuf.value()));
373+
result.Insert(result.End(), std::move(recordsSerializedBuf));
374374
}
375375

376376
return result;
@@ -470,14 +470,14 @@ namespace NActors {
470470
for (const TRope& rope : payload) {
471471
headerLen += SerializeNumber(rope.size(), temp);
472472
}
473-
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true});
473+
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, false});
474474
for (const TRope& rope : payload) {
475475
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)});
476476
}
477477
}
478478

479479
const size_t byteSize = Max<ssize_t>(0, recordSize) + preserializedSize;
480-
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself
480+
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, false}); // protobuf itself
481481

482482
#ifndef NDEBUG
483483
size_t total = 0;

ydb/library/actors/core/event_pb.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ namespace NActors {
152152
void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize);
153153
bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload);
154154
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload);
155-
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool);
155+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator);
156156
ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize);
157157
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize);
158158

@@ -205,8 +205,8 @@ namespace NActors {
205205
return CalculateSerializedSizeImpl(Payload, Record.ByteSize());
206206
}
207207

208-
std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool* pool) const override {
209-
return NActors::SerializeToRopeImpl(Record, Payload, pool);
208+
std::optional<TRope> SerializeToRope(IRcBufAllocator* allocator) const override {
209+
return NActors::SerializeToRopeImpl(Record, Payload, allocator);
210210
}
211211

212212
static TEv* Load(const TEventSerializedData *input) {

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ namespace NActors {
7878
Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds());
7979
}
8080
event.Span && event.Span.Event("FeedBuf:INITIAL");
81-
SendViaRdma.reset();
8281
if (event.Buffer) {
8382
State = EState::BODY;
8483
Iter = event.Buffer->GetBeginIter();
@@ -105,22 +104,15 @@ namespace NActors {
105104
} else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
106105
State = EState::SECTIONS;
107106
SectionIndex = 0;
107+
XXH3_64bits_reset(&RdmaCumulativeChecksumState);
108108

109-
size_t totalSize = 0;
110-
// It is possible to have event without payload. Such events has only one section.
111-
// We do not send such events via rdma.
112-
bool sendViaRdma = Params.UseRdma && RdmaMemPool && SerializationInfo->Sections.size() > 2;
113-
// Check each section can be send via rdma
109+
bool sendViaRdma = false;
110+
// Check if any section can be send via rdma
114111
for (const auto& section : SerializationInfo->Sections) {
115-
sendViaRdma &= section.IsRdmaCapable;
116-
totalSize += section.Size;
112+
sendViaRdma |= section.IsRdmaCapable;
117113
}
118-
if (sendViaRdma) {
119-
Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type());
120-
NActorsInterconnect::TRdmaCreds rdmaCreds;
121-
ui32 checkSum = 0;
122-
if (SerializeEventRdma(event, rdmaCreds, task.Params.ChecksumRdmaEvent ? &checkSum : nullptr, rdmaDeviceIndex)) {
123-
SendViaRdma.emplace(TRdmaSerializationArtifacts{std::move(rdmaCreds), checkSum});
114+
if (sendViaRdma && Params.UseRdma && RdmaMemPool) {
115+
if (SerializeEventRdma(event)) {
124116
Chunker.DiscardEvent();
125117
}
126118
}
@@ -168,8 +160,7 @@ namespace NActors {
168160
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
169161
if (section.IsInline && Params.UseXdcShuffle) {
170162
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
171-
}
172-
if (SendViaRdma) {
163+
} else if (section.IsRdmaCapable) {
173164
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_RDMA);
174165
}
175166
Y_ABORT_UNLESS(p <= std::end(sectionInfo));
@@ -268,16 +259,18 @@ namespace NActors {
268259
if (!Params.UseExternalDataChannel || sections.empty()) {
269260
// all data goes inline
270261
IsPartInline = true;
262+
IsPartRdma = false;
271263
PartLenRemain = Max<size_t>();
272-
} else if (!Params.UseXdcShuffle || SendViaRdma) {
264+
} else if (!Params.UseXdcShuffle) {
273265
// when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
274-
// also when we use RDMA, we transfer whole over RDMA
275266
IsPartInline = false;
267+
IsPartRdma = false;
276268
PartLenRemain = Max<size_t>();
277269
} else {
278270
Y_ABORT_UNLESS(SectionIndex < sections.size());
279271
IsPartInline = sections[SectionIndex].IsInline;
280-
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
272+
IsPartRdma = sections[SectionIndex].IsRdmaCapable;
273+
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline && IsPartRdma == sections[SectionIndex].IsRdmaCapable) {
281274
PartLenRemain += sections[SectionIndex].Size;
282275
++SectionIndex;
283276
}
@@ -288,8 +281,8 @@ namespace NActors {
288281
std::optional<bool> complete = false;
289282
if (IsPartInline) {
290283
complete = FeedInlinePayload(task, event);
291-
} else if (SendViaRdma) {
292-
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex);
284+
} else if (IsPartRdma) {
285+
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex, task.Params.ChecksumRdmaEvent);
293286
} else {
294287
complete = FeedExternalPayload(task, event);
295288
}
@@ -325,56 +318,54 @@ namespace NActors {
325318
return complete;
326319
}
327320

328-
bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds,
329-
ui32* checksum, ssize_t rdmaDeviceIndex)
330-
{
321+
bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event) {
331322
if (!event.Buffer && event.Event) {
332-
std::optional<TRope> rope = event.Event->SerializeToRope(RdmaMemPool.get());
323+
std::optional<TRope> rope = event.Event->SerializeToRope(GetDefaultRcBufAllocator());
333324
if (!rope) {
334325
return false; // serialization failed
335326
}
336327
event.Buffer = MakeIntrusive<TEventSerializedData>(
337328
std::move(*rope), event.Event->CreateSerializationInfo()
338329
);
330+
event.Event = nullptr;
339331
Iter = event.Buffer->GetBeginIter();
340332
}
341333

342-
XXH3_state_t state;
343-
if (checksum) {
344-
XXH3_64bits_reset(&state);
345-
}
334+
return true;
335+
}
336+
337+
std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) {
338+
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
346339

347-
if (event.Buffer) {
348-
for (; Iter.Valid(); ++Iter) {
340+
Y_ABORT_UNLESS(event.Buffer);
341+
if (RdmaCredsBuffer.CredsSize() == 0) {
342+
auto prevIter = Iter;
343+
size_t prevPartLenRemain = PartLenRemain;
344+
for (; Iter.Valid() && PartLenRemain; ++Iter) {
349345
TRcBuf buf = Iter.GetChunk();
350346
auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(buf);
351347
if (memReg.Empty()) {
352-
// TODO: may be copy to RDMA buffer ?????
353-
Iter = event.Buffer->GetBeginIter();
348+
Iter = prevIter;
349+
IsPartRdma = false;
350+
RdmaCredsBuffer.Clear();
351+
PartLenRemain = prevPartLenRemain;
354352
return false;
355353
}
356-
if (checksum) {
357-
XXH3_64bits_update(&state, buf.GetData(), buf.GetSize());
354+
if (checksumming) {
355+
XXH3_64bits_update(&RdmaCumulativeChecksumState, buf.GetData(), buf.GetSize());
358356
}
359-
auto cred = rdmaCreds.AddCreds();
357+
auto cred = RdmaCredsBuffer.AddCreds();
360358
cred->SetAddress(reinterpret_cast<ui64>(memReg.GetAddr()));
361359
cred->SetSize(memReg.GetSize());
362360
cred->SetRkey(memReg.GetRKey(rdmaDeviceIndex));
361+
362+
event.EventActuallySerialized += buf.GetSize();
363+
PartLenRemain -= buf.GetSize();
363364
}
364365
}
365-
366-
if (checksum) {
367-
*checksum = XXH3_64bits_digest(&state);
368-
}
369-
return true;
370-
}
371-
372-
std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex) {
373-
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
374-
const NActorsInterconnect::TRdmaCreds& rdmaCreds = SendViaRdma->RdmaCreds;
375-
ui32 checkSum = SendViaRdma->CheckSum;
376-
377-
ui16 credsSerializedSize = rdmaCreds.ByteSizeLong();
366+
Y_ABORT_UNLESS(PartLenRemain == 0);
367+
368+
ui16 credsSerializedSize = RdmaCredsBuffer.ByteSizeLong();
378369
// Part = | TChannelPart | EXdcCommand::RDMA_READ | rdmaCreds.Size | rdmaCreds | checkSum |
379370
size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + credsSerializedSize + sizeof(ui32);
380371
Y_ABORT_UNLESS(partSize < 4096);
@@ -396,20 +387,22 @@ namespace NActors {
396387
ptr += sizeof(ui16);
397388

398389
ui32 payloadSz = 0;
399-
for (const auto& rdmaCred : rdmaCreds.GetCreds()) {
390+
for (const auto& rdmaCred : RdmaCredsBuffer.GetCreds()) {
400391
payloadSz += rdmaCred.GetSize();
401392
}
402393

403-
Y_ABORT_UNLESS(rdmaCreds.SerializePartialToArray(ptr, credsSerializedSize));
394+
Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize));
404395
ptr += credsSerializedSize;
405-
WriteUnaligned<ui32>(ptr, checkSum);
406-
OutputQueueSize -= event.EventSerializedSize;
396+
WriteUnaligned<ui32>(ptr, checksumming ? XXH3_64bits_digest(&RdmaCumulativeChecksumState) : 0);
397+
OutputQueueSize -= payloadSz;
407398

408399
task.Write<false>(buffer, partSize);
409400

410401
task.AttachRdmaPayloadSize(payloadSz);
411402

412-
return true;
403+
RdmaCredsBuffer.Clear();
404+
405+
return !Iter.Valid();
413406
}
414407

415408
std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) {

ydb/library/actors/interconnect/interconnect_channel.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,22 @@ namespace NActors {
146146
TEventSerializationInfo SerializationInfoContainer;
147147
const TEventSerializationInfo *SerializationInfo = nullptr;
148148
bool IsPartInline = false;
149+
bool IsPartRdma = false;
150+
NActorsInterconnect::TRdmaCreds RdmaCredsBuffer;
149151
size_t PartLenRemain = 0;
150152
size_t SectionIndex = 0;
151153
std::vector<char> XdcData;
152154
std::shared_ptr<NInterconnect::NRdma::IMemPool> RdmaMemPool;
153-
struct TRdmaSerializationArtifacts {
154-
NActorsInterconnect::TRdmaCreds RdmaCreds;
155-
ui32 CheckSum = 0;
156-
};
157-
std::optional<TRdmaSerializationArtifacts> SendViaRdma;
155+
XXH3_state_t RdmaCumulativeChecksumState;
158156

159157
template<bool External>
160158
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
161-
bool SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds, ui32* checkSum, ssize_t rdmaDeviceIndex);
159+
bool SerializeEventRdma(TEventHolder& event);
162160

163161
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
164162
std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event);
165163
std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event);
166-
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
164+
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming);
167165

168166
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event);
169167

ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -803,9 +803,9 @@ namespace NActors {
803803
Y_ABORT_UNLESS(creds.ParseFromArray(ptr, credsSerializedSize));
804804
ptr += credsSerializedSize;
805805
if (Params.ChecksumRdmaEvent) {
806-
context.PendingEvents.back().RdmaCheckSum = ReadUnaligned<ui32>(ptr);
806+
context.PendingEvents.back().RdmaCumulativeCheckSum = ReadUnaligned<ui32>(ptr);
807807
} else {
808-
context.PendingEvents.back().RdmaCheckSum = 0;
808+
context.PendingEvents.back().RdmaCumulativeCheckSum = 0;
809809
}
810810
ptr += sizeof(ui32);
811811
auto err = context.ScheduleRdmaReadRequests(creds, RdmaCq, SelfId(), channel);
@@ -898,18 +898,23 @@ namespace NActors {
898898
for (const auto&& [data, size] : payload) {
899899
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
900900
}
901-
} else if (pendingEvent.RdmaCheckSum) {
901+
if (checksum != descr.Checksum) {
902+
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
903+
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
904+
}
905+
}
906+
if (pendingEvent.RdmaCumulativeCheckSum) {
902907
XXH3_state_t state;
903908
XXH3_64bits_reset(&state);
904-
for (const auto&& [data, size] : payload) {
905-
XXH3_64bits_update(&state, data, size);
909+
for (auto iter = payload.Begin(); iter.Valid(); ++iter) {
910+
auto memRegion = NInterconnect::NRdma::TryExtractFromRcBuf(iter.GetChunk());
911+
if (!memRegion.Empty()) {
912+
XXH3_64bits_update(&state, memRegion.GetAddr(), memRegion.GetSize());
913+
}
906914
}
907915
checksum = XXH3_64bits_digest(&state);
908-
}
909-
ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum;
910-
if (expectedChecksum) {
911-
if (checksum != expectedChecksum) {
912-
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
916+
if (checksum != pendingEvent.RdmaCumulativeCheckSum) {
917+
LOG_CRIT_IC_SESSION("ICIS05", "event rdma checksum error Type# 0x%08" PRIx32, descr.Type);
913918
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
914919
}
915920
}

ydb/library/actors/interconnect/interconnect_tcp_session.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,7 @@ namespace NActors {
10391039
const ui32 gross = grossAfter - grossBefore;
10401040
channel->UnaccountedTraffic += gross;
10411041
const ui64 netAfter = channel->GetBufferedAmountOfData();
1042-
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink
1042+
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore, "netBefore# %" PRIu64 " netAfter# %" PRIu64, netBefore, netAfter); // net amount should shrink
10431043
const ui64 net = netBefore - netAfter; // number of net bytes serialized
10441044

10451045
// adjust metrics for local and global queue size

ydb/library/actors/interconnect/interconnect_tcp_session.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ namespace NActors {
155155
std::deque<NInterconnect::NRdma::TMemRegionSlice> RdmaBuffers;
156156
TRdmaReadContext::TPtr RdmaReadContext = nullptr;
157157
size_t RdmaSize = 0;
158-
ui32 RdmaCheckSum = 0;
158+
ui32 RdmaCumulativeCheckSum = 0;
159159
};
160160

161161
std::deque<TPendingEvent> PendingEvents;

0 commit comments

Comments
 (0)