From be446678159afe706c80a62e95159006e993108f Mon Sep 17 00:00:00 2001 From: robdrynkin Date: Mon, 24 Nov 2025 09:39:07 +0100 Subject: [PATCH 1/5] Support xdc shuffle for RDMA --- ydb/library/actors/core/event.h | 2 +- ydb/library/actors/core/event_pb.cpp | 14 ++-- ydb/library/actors/core/event_pb.h | 6 +- .../interconnect/interconnect_channel.cpp | 73 +++++++++---------- .../interconnect/interconnect_channel.h | 11 +-- .../interconnect_tcp_input_session.cpp | 9 ++- .../interconnect/interconnect_tcp_session.cpp | 2 +- .../interconnect/ut_rdma/rdma_xdc_ut.cpp | 4 +- 8 files changed, 59 insertions(+), 62 deletions(-) diff --git a/ydb/library/actors/core/event.h b/ydb/library/actors/core/event.h index b2239a2caa95..86369f8f5ee6 100644 --- a/ydb/library/actors/core/event.h +++ b/ydb/library/actors/core/event.h @@ -37,7 +37,7 @@ namespace NActors { } virtual ui32 Type() const = 0; virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; - virtual std::optional SerializeToRope(NInterconnect::NRdma::IMemPool*) const { + virtual std::optional SerializeToRope(IRcBufAllocator*) const { return std::nullopt; } virtual bool IsSerializable() const = 0; diff --git a/ydb/library/actors/core/event_pb.cpp b/ydb/library/actors/core/event_pb.cpp index e30cab049b7c..84d39c634bd8 100644 --- a/ydb/library/actors/core/event_pb.cpp +++ b/ydb/library/actors/core/event_pb.cpp @@ -338,22 +338,22 @@ namespace NActors { return true; } - std::optional SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector& payload, NInterconnect::NRdma::IMemPool* pool) { + std::optional SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector& payload, IRcBufAllocator* allocator) { TRope result; auto sz = CalculateSerializedHeaderSizeImpl(payload); if (sz) { - std::optional headerBuf = pool->AllocRcBuf(sz, NInterconnect::NRdma::IMemPool::EMPTY); + TRcBuf headerBuf = allocator->AllocRcBuf(sz, 0, 0); if (!headerBuf) { return {}; } - char* data = headerBuf->GetDataMut(); + char* data = headerBuf.GetDataMut(); auto append = [&data](const char *p, size_t len) { std::memcpy(data, p, len); data += len; return true; }; SerializeHeaderCommon(payload, append); - result.Insert(result.End(), std::move(headerBuf.value())); + result.Insert(result.End(), std::move(headerBuf)); auto appendRope = [&](TRope rope) { result.Insert(result.End(), std::move(rope)); @@ -364,13 +364,13 @@ namespace NActors { { ui32 size = msg.ByteSizeLong(); - std::optional recordsSerializedBuf = pool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY); + TRcBuf recordsSerializedBuf = allocator->AllocRcBuf(size, 0, 0); if (!recordsSerializedBuf) { return {}; } - bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf->GetDataMut(), size); + bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size); Y_ABORT_UNLESS(serializationDone); - result.Insert(result.End(), std::move(recordsSerializedBuf.value())); + result.Insert(result.End(), std::move(recordsSerializedBuf)); } return result; diff --git a/ydb/library/actors/core/event_pb.h b/ydb/library/actors/core/event_pb.h index 723727023770..08aaaa0faf62 100644 --- a/ydb/library/actors/core/event_pb.h +++ b/ydb/library/actors/core/event_pb.h @@ -152,7 +152,7 @@ namespace NActors { void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector &payload, size_t &totalPayloadSize); bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector &payload); ui32 CalculateSerializedHeaderSizeImpl(const TVector &payload); - std::optional SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector& payload, NInterconnect::NRdma::IMemPool* pool); + std::optional SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector& payload, IRcBufAllocator* allocator); ui32 CalculateSerializedSizeImpl(const TVector &payload, ssize_t recordSize); TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector &payload, ssize_t recordSize); @@ -205,8 +205,8 @@ namespace NActors { return CalculateSerializedSizeImpl(Payload, Record.ByteSize()); } - std::optional SerializeToRope(NInterconnect::NRdma::IMemPool* pool) const override { - return NActors::SerializeToRopeImpl(Record, Payload, pool); + std::optional SerializeToRope(IRcBufAllocator* allocator) const override { + return NActors::SerializeToRopeImpl(Record, Payload, allocator); } static TEv* Load(const TEventSerializedData *input) { diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index db2d128e7950..969b496da335 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -78,7 +78,6 @@ namespace NActors { Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds()); } event.Span && event.Span.Event("FeedBuf:INITIAL"); - SendViaRdma.reset(); if (event.Buffer) { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); @@ -118,9 +117,7 @@ namespace NActors { if (sendViaRdma) { Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type()); NActorsInterconnect::TRdmaCreds rdmaCreds; - ui32 checkSum = 0; - if (SerializeEventRdma(event, rdmaCreds, task.Params.ChecksumRdmaEvent ? &checkSum : nullptr, rdmaDeviceIndex)) { - SendViaRdma.emplace(TRdmaSerializationArtifacts{std::move(rdmaCreds), checkSum}); + if (SerializeEventRdma(event)) { Chunker.DiscardEvent(); } } @@ -168,8 +165,7 @@ namespace NActors { p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p); if (section.IsInline && Params.UseXdcShuffle) { type = static_cast(EXdcCommand::DECLARE_SECTION_INLINE); - } - if (SendViaRdma) { + } else if (section.IsRdmaCapable) { type = static_cast(EXdcCommand::DECLARE_SECTION_RDMA); } Y_ABORT_UNLESS(p <= std::end(sectionInfo)); @@ -268,16 +264,18 @@ namespace NActors { if (!Params.UseExternalDataChannel || sections.empty()) { // all data goes inline IsPartInline = true; + IsPartRdma = false; PartLenRemain = Max(); - } else if (!Params.UseXdcShuffle || SendViaRdma) { + } else if (!Params.UseXdcShuffle) { // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC - // also when we use RDMA, we transfer whole over RDMA IsPartInline = false; + IsPartRdma = false; PartLenRemain = Max(); } else { Y_ABORT_UNLESS(SectionIndex < sections.size()); IsPartInline = sections[SectionIndex].IsInline; - while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) { + IsPartRdma = sections[SectionIndex].IsRdmaCapable; + while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline && IsPartRdma == sections[SectionIndex].IsRdmaCapable) { PartLenRemain += sections[SectionIndex].Size; ++SectionIndex; } @@ -288,8 +286,8 @@ namespace NActors { std::optional complete = false; if (IsPartInline) { complete = FeedInlinePayload(task, event); - } else if (SendViaRdma) { - complete = FeedRdmaPayload(task, event, rdmaDeviceIndex); + } else if (IsPartRdma) { + complete = FeedRdmaPayload(task, event, rdmaDeviceIndex, task.Params.ChecksumRdmaEvent); } else { complete = FeedExternalPayload(task, event); } @@ -325,27 +323,34 @@ namespace NActors { return complete; } - bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds, - ui32* checksum, ssize_t rdmaDeviceIndex) - { + bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event) { if (!event.Buffer && event.Event) { - std::optional rope = event.Event->SerializeToRope(RdmaMemPool.get()); + // std::optional rope = event.Event->SerializeToRope(RdmaMemPool.get()); + std::optional rope = event.Event->SerializeToRope(GetDefaultRcBufAllocator()); if (!rope) { return false; // serialization failed } event.Buffer = MakeIntrusive( std::move(*rope), event.Event->CreateSerializationInfo() ); + event.Event = nullptr; Iter = event.Buffer->GetBeginIter(); } + return true; + } + + std::optional TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) { + Y_ABORT_UNLESS(rdmaDeviceIndex >= 0); + XXH3_state_t state; - if (checksum) { + if (checksumming) { XXH3_64bits_reset(&state); } - if (event.Buffer) { - for (; Iter.Valid(); ++Iter) { + Y_ABORT_UNLESS(event.Buffer); + if (RdmaCredsBuffer.CredsSize() == 0) { + for (; Iter.Valid() && PartLenRemain; ++Iter) { TRcBuf buf = Iter.GetChunk(); auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(buf); if (memReg.Empty()) { @@ -353,28 +358,20 @@ namespace NActors { Iter = event.Buffer->GetBeginIter(); return false; } - if (checksum) { + if (checksumming) { XXH3_64bits_update(&state, buf.GetData(), buf.GetSize()); } - auto cred = rdmaCreds.AddCreds(); + auto cred = RdmaCredsBuffer.AddCreds(); cred->SetAddress(reinterpret_cast(memReg.GetAddr())); cred->SetSize(memReg.GetSize()); cred->SetRkey(memReg.GetRKey(rdmaDeviceIndex)); + + event.EventActuallySerialized += buf.GetSize(); + PartLenRemain -= buf.GetSize(); } } - if (checksum) { - *checksum = XXH3_64bits_digest(&state); - } - return true; - } - - std::optional TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex) { - Y_ABORT_UNLESS(rdmaDeviceIndex >= 0); - const NActorsInterconnect::TRdmaCreds& rdmaCreds = SendViaRdma->RdmaCreds; - ui32 checkSum = SendViaRdma->CheckSum; - - ui16 credsSerializedSize = rdmaCreds.ByteSizeLong(); + ui16 credsSerializedSize = RdmaCredsBuffer.ByteSizeLong(); // Part = | TChannelPart | EXdcCommand::RDMA_READ | rdmaCreds.Size | rdmaCreds | checkSum | size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + credsSerializedSize + sizeof(ui32); Y_ABORT_UNLESS(partSize < 4096); @@ -396,20 +393,22 @@ namespace NActors { ptr += sizeof(ui16); ui32 payloadSz = 0; - for (const auto& rdmaCred : rdmaCreds.GetCreds()) { + for (const auto& rdmaCred : RdmaCredsBuffer.GetCreds()) { payloadSz += rdmaCred.GetSize(); } - Y_ABORT_UNLESS(rdmaCreds.SerializePartialToArray(ptr, credsSerializedSize)); + Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize)); ptr += credsSerializedSize; - WriteUnaligned(ptr, checkSum); - OutputQueueSize -= event.EventSerializedSize; + WriteUnaligned(ptr, checksumming ? XXH3_64bits_digest(&state) : 0); + OutputQueueSize -= payloadSz; task.Write(buffer, partSize); task.AttachRdmaPayloadSize(payloadSz); - return true; + RdmaCredsBuffer.Clear(); + + return !Iter.Valid(); } std::optional TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) { diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index 7c28e88a9270..98b68e50c9d8 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -146,24 +146,21 @@ namespace NActors { TEventSerializationInfo SerializationInfoContainer; const TEventSerializationInfo *SerializationInfo = nullptr; bool IsPartInline = false; + bool IsPartRdma = false; + NActorsInterconnect::TRdmaCreds RdmaCredsBuffer; size_t PartLenRemain = 0; size_t SectionIndex = 0; std::vector XdcData; std::shared_ptr RdmaMemPool; - struct TRdmaSerializationArtifacts { - NActorsInterconnect::TRdmaCreds RdmaCreds; - ui32 CheckSum = 0; - }; - std::optional SendViaRdma; template bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); - bool SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds, ui32* checkSum, ssize_t rdmaDeviceIndex); + bool SerializeEventRdma(TEventHolder& event); bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex); std::optional FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event); std::optional FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event); - std::optional FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex); + std::optional FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming); bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event); diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index 6b7d25478cae..ca385ca63616 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -908,10 +908,11 @@ namespace NActors { } ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum; if (expectedChecksum) { - if (checksum != expectedChecksum) { - LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type); - throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; - } + Y_UNUSED(checksum, expectedChecksum); + // if (checksum != expectedChecksum) { + // LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type); + // throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + // } } pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat; diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index 2f64d1a3fc77..8aca381085d1 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -1039,7 +1039,7 @@ namespace NActors { const ui32 gross = grossAfter - grossBefore; channel->UnaccountedTraffic += gross; const ui64 netAfter = channel->GetBufferedAmountOfData(); - Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink + Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore, "netBefore# %" PRIu64 " netAfter# %" PRIu64, netBefore, netAfter); // net amount should shrink const ui64 net = netBefore - netAfter; // number of net bytes serialized // adjust metrics for local and global queue size diff --git a/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp b/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp index e37253feb2d3..23c192422809 100644 --- a/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp +++ b/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp @@ -256,8 +256,8 @@ TEST_F(XdcRdmaTest, SerializeToRope) { } auto mempool = NInterconnect::NRdma::CreateSlotMemPool(nullptr); - - auto serializedRope = ev->SerializeToRope(mempool.get()); + TRdmaAllocatorWithFallback allocator(mempool); + auto serializedRope = ev->SerializeToRope(&allocator); ASSERT_TRUE(serializedRope.has_value()); auto rope = serializedRope->ConvertToString(); From 5a3acf6825905b8f91368b2f5751a88d09d19c6b Mon Sep 17 00:00:00 2001 From: robdrynkin Date: Mon, 24 Nov 2025 10:42:26 +0100 Subject: [PATCH 2/5] Add test --- ydb/library/actors/core/event_pb.cpp | 4 +- .../interconnect/interconnect_channel.cpp | 13 ++---- .../interconnect/ut_rdma/rdma_xdc_ut.cpp | 41 +++++++++++++++++++ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/ydb/library/actors/core/event_pb.cpp b/ydb/library/actors/core/event_pb.cpp index 84d39c634bd8..8508fda0de24 100644 --- a/ydb/library/actors/core/event_pb.cpp +++ b/ydb/library/actors/core/event_pb.cpp @@ -470,14 +470,14 @@ namespace NActors { for (const TRope& rope : payload) { headerLen += SerializeNumber(rope.size(), temp); } - info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true}); + info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, false}); for (const TRope& rope : payload) { info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)}); } } const size_t byteSize = Max(0, recordSize) + preserializedSize; - info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself + info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, false}); // protobuf itself #ifndef NDEBUG size_t total = 0; diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index 969b496da335..65e11299a137 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -105,17 +105,12 @@ namespace NActors { State = EState::SECTIONS; SectionIndex = 0; - size_t totalSize = 0; - // It is possible to have event without payload. Such events has only one section. - // We do not send such events via rdma. - bool sendViaRdma = Params.UseRdma && RdmaMemPool && SerializationInfo->Sections.size() > 2; - // Check each section can be send via rdma + bool sendViaRdma = false; + // Check if any section can be send via rdma for (const auto& section : SerializationInfo->Sections) { - sendViaRdma &= section.IsRdmaCapable; - totalSize += section.Size; + sendViaRdma |= section.IsRdmaCapable; } - if (sendViaRdma) { - Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type()); + if (sendViaRdma && Params.UseRdma && RdmaMemPool) { NActorsInterconnect::TRdmaCreds rdmaCreds; if (SerializeEventRdma(event)) { Chunker.DiscardEvent(); diff --git a/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp b/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp index 23c192422809..057e2b1b8266 100644 --- a/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp +++ b/ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp @@ -317,6 +317,47 @@ TEST_F(XdcRdmaTest, SendRdma) { UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20)); } +TEST_F(XdcRdmaTest, SendRdmaWithShuffledPayload) { + TTestICCluster cluster(2); + auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); + auto ev = new TEvTestSerialization(); + ev->Record.SetBlobID(123); + ev->Record.SetBuffer("hello world"); + for (ui32 i = 0; i < 10; ++i) { + if (i % 2 == 0) { + TRope tmp(TString(5000, 'X')); + ev->AddPayload(std::move(tmp)); + } else { + auto buf = memPool->AllocRcBuf(5000, 0).value(); + std::fill(buf.GetDataMut(), buf.GetDataMut() + 5000, 'Y'); + ev->AddPayload(TRope(std::move(buf))); + } + } + + auto recieverPtr = new TReceiveActor([](TEvTestSerialization::TPtr ev) { + Cerr << "Blob ID: " << ev->Get()->Record.GetBlobID() << Endl; + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBlobID(), 123u); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetBuffer(), "hello world"); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload().size(), 10u); + for (ui32 i = 0; i < 10; ++i) { + if (i % 2 == 0) { + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'X')); + } else { + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].GetSize(), 5000u); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->GetPayload()[i].ConvertToString(), TString(5000, 'Y')); + } + } + }); + const TActorId receiver = cluster.RegisterActor(recieverPtr, 1); + + Sleep(TDuration::MilliSeconds(1000)); + + auto senderPtr = new TSendActor(receiver, ev); + cluster.RegisterActor(senderPtr, 2); + UNIT_ASSERT(recieverPtr->WhaitForRecieve(1, 20)); +} + TEST_F(XdcRdmaTest, SendRdmaWithRegionOffset) { TTestICCluster cluster(2); auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); From c779a5937cab7393215b41ef829afc8f6c005b4c Mon Sep 17 00:00:00 2001 From: robdrynkin Date: Mon, 24 Nov 2025 10:48:44 +0100 Subject: [PATCH 3/5] improve error handling --- .../actors/interconnect/interconnect_channel.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index 65e11299a137..35b37ccd0d05 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -320,7 +320,6 @@ namespace NActors { bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event) { if (!event.Buffer && event.Event) { - // std::optional rope = event.Event->SerializeToRope(RdmaMemPool.get()); std::optional rope = event.Event->SerializeToRope(GetDefaultRcBufAllocator()); if (!rope) { return false; // serialization failed @@ -345,12 +344,16 @@ namespace NActors { Y_ABORT_UNLESS(event.Buffer); if (RdmaCredsBuffer.CredsSize() == 0) { + auto prevIter = Iter; + size_t prevPartLenRemain = PartLenRemain; for (; Iter.Valid() && PartLenRemain; ++Iter) { TRcBuf buf = Iter.GetChunk(); auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(buf); if (memReg.Empty()) { - // TODO: may be copy to RDMA buffer ????? - Iter = event.Buffer->GetBeginIter(); + Iter = prevIter; + IsPartRdma = false; + RdmaCredsBuffer.Clear(); + PartLenRemain = prevPartLenRemain; return false; } if (checksumming) { @@ -365,7 +368,8 @@ namespace NActors { PartLenRemain -= buf.GetSize(); } } - + Y_ABORT_UNLESS(PartLenRemain == 0); + ui16 credsSerializedSize = RdmaCredsBuffer.ByteSizeLong(); // Part = | TChannelPart | EXdcCommand::RDMA_READ | rdmaCreds.Size | rdmaCreds | checkSum | size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + credsSerializedSize + sizeof(ui32); From ade9f81e742072d0c1cb17036f1748939a62638d Mon Sep 17 00:00:00 2001 From: robdrynkin Date: Mon, 24 Nov 2025 13:14:26 +0100 Subject: [PATCH 4/5] Add checksum support --- .../interconnect/interconnect_channel.cpp | 11 +++----- .../interconnect/interconnect_channel.h | 1 + .../interconnect_tcp_input_session.cpp | 26 +++++++++++-------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index 35b37ccd0d05..bbf6c83f0cda 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -104,6 +104,7 @@ namespace NActors { } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) { State = EState::SECTIONS; SectionIndex = 0; + XXH3_64bits_reset(&RdmaChecksumState); bool sendViaRdma = false; // Check if any section can be send via rdma @@ -111,7 +112,6 @@ namespace NActors { sendViaRdma |= section.IsRdmaCapable; } if (sendViaRdma && Params.UseRdma && RdmaMemPool) { - NActorsInterconnect::TRdmaCreds rdmaCreds; if (SerializeEventRdma(event)) { Chunker.DiscardEvent(); } @@ -337,11 +337,6 @@ namespace NActors { std::optional TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) { Y_ABORT_UNLESS(rdmaDeviceIndex >= 0); - XXH3_state_t state; - if (checksumming) { - XXH3_64bits_reset(&state); - } - Y_ABORT_UNLESS(event.Buffer); if (RdmaCredsBuffer.CredsSize() == 0) { auto prevIter = Iter; @@ -357,7 +352,7 @@ namespace NActors { return false; } if (checksumming) { - XXH3_64bits_update(&state, buf.GetData(), buf.GetSize()); + XXH3_64bits_update(&RdmaChecksumState, buf.GetData(), buf.GetSize()); } auto cred = RdmaCredsBuffer.AddCreds(); cred->SetAddress(reinterpret_cast(memReg.GetAddr())); @@ -398,7 +393,7 @@ namespace NActors { Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize)); ptr += credsSerializedSize; - WriteUnaligned(ptr, checksumming ? XXH3_64bits_digest(&state) : 0); + WriteUnaligned(ptr, checksumming ? XXH3_64bits_digest(&RdmaChecksumState) : 0); OutputQueueSize -= payloadSz; task.Write(buffer, partSize); diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index 98b68e50c9d8..8fd8a47f452d 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -152,6 +152,7 @@ namespace NActors { size_t SectionIndex = 0; std::vector XdcData; std::shared_ptr RdmaMemPool; + XXH3_state_t RdmaChecksumState; template bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index ca385ca63616..3341839b4ccf 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -898,21 +898,25 @@ namespace NActors { for (const auto&& [data, size] : payload) { checksum = Crc32cExtendMSanCompatible(checksum, data, size); } - } else if (pendingEvent.RdmaCheckSum) { + if (checksum != descr.Checksum) { + LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + } + } + if (pendingEvent.RdmaCheckSum) { XXH3_state_t state; XXH3_64bits_reset(&state); - for (const auto&& [data, size] : payload) { - XXH3_64bits_update(&state, data, size); + for (auto iter = payload.Begin(); iter.Valid(); ++iter) { + auto memRegion = NInterconnect::NRdma::TryExtractFromRcBuf(iter.GetChunk()); + if (!memRegion.Empty()) { + XXH3_64bits_update(&state, memRegion.GetAddr(), memRegion.GetSize()); + } } checksum = XXH3_64bits_digest(&state); - } - ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum; - if (expectedChecksum) { - Y_UNUSED(checksum, expectedChecksum); - // if (checksum != expectedChecksum) { - // LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type); - // throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; - // } + if (checksum != pendingEvent.RdmaCheckSum) { + LOG_CRIT_IC_SESSION("ICIS05", "event rdma checksum error Type# 0x%08" PRIx32, descr.Type); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + } } pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat; From 80bdd0277af9f0477d36cd1283293d94ae52467d Mon Sep 17 00:00:00 2001 From: robdrynkin Date: Mon, 24 Nov 2025 14:19:32 +0100 Subject: [PATCH 5/5] Rename checksum to cumulative checksum --- ydb/library/actors/interconnect/interconnect_channel.cpp | 6 +++--- ydb/library/actors/interconnect/interconnect_channel.h | 2 +- .../interconnect/interconnect_tcp_input_session.cpp | 8 ++++---- .../actors/interconnect/interconnect_tcp_session.h | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index bbf6c83f0cda..44e4c75542c3 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -104,7 +104,7 @@ namespace NActors { } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) { State = EState::SECTIONS; SectionIndex = 0; - XXH3_64bits_reset(&RdmaChecksumState); + XXH3_64bits_reset(&RdmaCumulativeChecksumState); bool sendViaRdma = false; // Check if any section can be send via rdma @@ -352,7 +352,7 @@ namespace NActors { return false; } if (checksumming) { - XXH3_64bits_update(&RdmaChecksumState, buf.GetData(), buf.GetSize()); + XXH3_64bits_update(&RdmaCumulativeChecksumState, buf.GetData(), buf.GetSize()); } auto cred = RdmaCredsBuffer.AddCreds(); cred->SetAddress(reinterpret_cast(memReg.GetAddr())); @@ -393,7 +393,7 @@ namespace NActors { Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize)); ptr += credsSerializedSize; - WriteUnaligned(ptr, checksumming ? XXH3_64bits_digest(&RdmaChecksumState) : 0); + WriteUnaligned(ptr, checksumming ? XXH3_64bits_digest(&RdmaCumulativeChecksumState) : 0); OutputQueueSize -= payloadSz; task.Write(buffer, partSize); diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index 8fd8a47f452d..571936646876 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -152,7 +152,7 @@ namespace NActors { size_t SectionIndex = 0; std::vector XdcData; std::shared_ptr RdmaMemPool; - XXH3_state_t RdmaChecksumState; + XXH3_state_t RdmaCumulativeChecksumState; template bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index 3341839b4ccf..5bf87fe01512 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -803,9 +803,9 @@ namespace NActors { Y_ABORT_UNLESS(creds.ParseFromArray(ptr, credsSerializedSize)); ptr += credsSerializedSize; if (Params.ChecksumRdmaEvent) { - context.PendingEvents.back().RdmaCheckSum = ReadUnaligned(ptr); + context.PendingEvents.back().RdmaCumulativeCheckSum = ReadUnaligned(ptr); } else { - context.PendingEvents.back().RdmaCheckSum = 0; + context.PendingEvents.back().RdmaCumulativeCheckSum = 0; } ptr += sizeof(ui32); auto err = context.ScheduleRdmaReadRequests(creds, RdmaCq, SelfId(), channel); @@ -903,7 +903,7 @@ namespace NActors { throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } } - if (pendingEvent.RdmaCheckSum) { + if (pendingEvent.RdmaCumulativeCheckSum) { XXH3_state_t state; XXH3_64bits_reset(&state); for (auto iter = payload.Begin(); iter.Valid(); ++iter) { @@ -913,7 +913,7 @@ namespace NActors { } } checksum = XXH3_64bits_digest(&state); - if (checksum != pendingEvent.RdmaCheckSum) { + if (checksum != pendingEvent.RdmaCumulativeCheckSum) { LOG_CRIT_IC_SESSION("ICIS05", "event rdma checksum error Type# 0x%08" PRIx32, descr.Type); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.h b/ydb/library/actors/interconnect/interconnect_tcp_session.h index d30aaf413394..169b1d7c54ea 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.h +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.h @@ -155,7 +155,7 @@ namespace NActors { std::deque RdmaBuffers; TRdmaReadContext::TPtr RdmaReadContext = nullptr; size_t RdmaSize = 0; - ui32 RdmaCheckSum = 0; + ui32 RdmaCumulativeCheckSum = 0; }; std::deque PendingEvents;