Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/library/actors/core/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace NActors {
}
virtual ui32 Type() const = 0;
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
virtual std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool*) const {
virtual std::optional<TRope> SerializeToRope(IRcBufAllocator*) const {
return std::nullopt;
}
virtual bool IsSerializable() const = 0;
Expand Down
18 changes: 9 additions & 9 deletions ydb/library/actors/core/event_pb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,22 @@ namespace NActors {
return true;
}

std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool) {
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator) {
TRope result;
auto sz = CalculateSerializedHeaderSizeImpl(payload);
if (sz) {
std::optional<TRcBuf> headerBuf = pool->AllocRcBuf(sz, NInterconnect::NRdma::IMemPool::EMPTY);
TRcBuf headerBuf = allocator->AllocRcBuf(sz, 0, 0);
if (!headerBuf) {
return {};
}
char* data = headerBuf->GetDataMut();
char* data = headerBuf.GetDataMut();
auto append = [&data](const char *p, size_t len) {
std::memcpy(data, p, len);
data += len;
return true;
};
SerializeHeaderCommon(payload, append);
result.Insert(result.End(), std::move(headerBuf.value()));
result.Insert(result.End(), std::move(headerBuf));

auto appendRope = [&](TRope rope) {
result.Insert(result.End(), std::move(rope));
Expand All @@ -364,13 +364,13 @@ namespace NActors {

{
ui32 size = msg.ByteSizeLong();
std::optional<TRcBuf> recordsSerializedBuf = pool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY);
TRcBuf recordsSerializedBuf = allocator->AllocRcBuf(size, 0, 0);
if (!recordsSerializedBuf) {
return {};
}
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf->GetDataMut(), size);
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size);
Y_ABORT_UNLESS(serializationDone);
result.Insert(result.End(), std::move(recordsSerializedBuf.value()));
result.Insert(result.End(), std::move(recordsSerializedBuf));
}

return result;
Expand Down Expand Up @@ -470,14 +470,14 @@ namespace NActors {
for (const TRope& rope : payload) {
headerLen += SerializeNumber(rope.size(), temp);
}
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true});
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, false});
for (const TRope& rope : payload) {
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)});
}
}

const size_t byteSize = Max<ssize_t>(0, recordSize) + preserializedSize;
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, false}); // protobuf itself

#ifndef NDEBUG
size_t total = 0;
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/actors/core/event_pb.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ namespace NActors {
void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize);
bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload);
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload);
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool);
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, IRcBufAllocator* allocator);
ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize);
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize);

Expand Down Expand Up @@ -205,8 +205,8 @@ namespace NActors {
return CalculateSerializedSizeImpl(Payload, Record.ByteSize());
}

std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool* pool) const override {
return NActors::SerializeToRopeImpl(Record, Payload, pool);
std::optional<TRope> SerializeToRope(IRcBufAllocator* allocator) const override {
return NActors::SerializeToRopeImpl(Record, Payload, allocator);
}

static TEv* Load(const TEventSerializedData *input) {
Expand Down
101 changes: 47 additions & 54 deletions ydb/library/actors/interconnect/interconnect_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ namespace NActors {
Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds());
}
event.Span && event.Span.Event("FeedBuf:INITIAL");
SendViaRdma.reset();
if (event.Buffer) {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
Expand All @@ -105,22 +104,15 @@ namespace NActors {
} else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
State = EState::SECTIONS;
SectionIndex = 0;
XXH3_64bits_reset(&RdmaChecksumState);

size_t totalSize = 0;
// It is possible to have event without payload. Such events has only one section.
// We do not send such events via rdma.
bool sendViaRdma = Params.UseRdma && RdmaMemPool && SerializationInfo->Sections.size() > 2;
// Check each section can be send via rdma
bool sendViaRdma = false;
// Check if any section can be send via rdma
for (const auto& section : SerializationInfo->Sections) {
sendViaRdma &= section.IsRdmaCapable;
totalSize += section.Size;
sendViaRdma |= section.IsRdmaCapable;
}
if (sendViaRdma) {
Y_ABORT_UNLESS(totalSize, "got empty sz, sections: %d type: %d ", SerializationInfo->Sections.size(), event.Event->Type());
NActorsInterconnect::TRdmaCreds rdmaCreds;
ui32 checkSum = 0;
if (SerializeEventRdma(event, rdmaCreds, task.Params.ChecksumRdmaEvent ? &checkSum : nullptr, rdmaDeviceIndex)) {
SendViaRdma.emplace(TRdmaSerializationArtifacts{std::move(rdmaCreds), checkSum});
if (sendViaRdma && Params.UseRdma && RdmaMemPool) {
if (SerializeEventRdma(event)) {
Chunker.DiscardEvent();
}
}
Expand Down Expand Up @@ -168,8 +160,7 @@ namespace NActors {
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
if (section.IsInline && Params.UseXdcShuffle) {
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
}
if (SendViaRdma) {
} else if (section.IsRdmaCapable) {
type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_RDMA);
}
Y_ABORT_UNLESS(p <= std::end(sectionInfo));
Expand Down Expand Up @@ -268,16 +259,18 @@ namespace NActors {
if (!Params.UseExternalDataChannel || sections.empty()) {
// all data goes inline
IsPartInline = true;
IsPartRdma = false;
PartLenRemain = Max<size_t>();
} else if (!Params.UseXdcShuffle || SendViaRdma) {
} else if (!Params.UseXdcShuffle) {
// when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
// also when we use RDMA, we transfer whole over RDMA
IsPartInline = false;
IsPartRdma = false;
PartLenRemain = Max<size_t>();
} else {
Y_ABORT_UNLESS(SectionIndex < sections.size());
IsPartInline = sections[SectionIndex].IsInline;
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
IsPartRdma = sections[SectionIndex].IsRdmaCapable;
while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline && IsPartRdma == sections[SectionIndex].IsRdmaCapable) {
PartLenRemain += sections[SectionIndex].Size;
++SectionIndex;
}
Expand All @@ -288,8 +281,8 @@ namespace NActors {
std::optional<bool> complete = false;
if (IsPartInline) {
complete = FeedInlinePayload(task, event);
} else if (SendViaRdma) {
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex);
} else if (IsPartRdma) {
complete = FeedRdmaPayload(task, event, rdmaDeviceIndex, task.Params.ChecksumRdmaEvent);
} else {
complete = FeedExternalPayload(task, event);
}
Expand Down Expand Up @@ -325,56 +318,54 @@ namespace NActors {
return complete;
}

bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds,
ui32* checksum, ssize_t rdmaDeviceIndex)
{
bool TEventOutputChannel::SerializeEventRdma(TEventHolder& event) {
if (!event.Buffer && event.Event) {
std::optional<TRope> rope = event.Event->SerializeToRope(RdmaMemPool.get());
std::optional<TRope> rope = event.Event->SerializeToRope(GetDefaultRcBufAllocator());
if (!rope) {
return false; // serialization failed
}
event.Buffer = MakeIntrusive<TEventSerializedData>(
std::move(*rope), event.Event->CreateSerializationInfo()
);
event.Event = nullptr;
Iter = event.Buffer->GetBeginIter();
}

XXH3_state_t state;
if (checksum) {
XXH3_64bits_reset(&state);
}
return true;
}

std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) {
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);

if (event.Buffer) {
for (; Iter.Valid(); ++Iter) {
Y_ABORT_UNLESS(event.Buffer);
if (RdmaCredsBuffer.CredsSize() == 0) {
auto prevIter = Iter;
size_t prevPartLenRemain = PartLenRemain;
for (; Iter.Valid() && PartLenRemain; ++Iter) {
TRcBuf buf = Iter.GetChunk();
auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(buf);
if (memReg.Empty()) {
// TODO: may be copy to RDMA buffer ?????
Iter = event.Buffer->GetBeginIter();
Iter = prevIter;
IsPartRdma = false;
RdmaCredsBuffer.Clear();
PartLenRemain = prevPartLenRemain;
return false;
}
if (checksum) {
XXH3_64bits_update(&state, buf.GetData(), buf.GetSize());
if (checksumming) {
XXH3_64bits_update(&RdmaChecksumState, buf.GetData(), buf.GetSize());
}
auto cred = rdmaCreds.AddCreds();
auto cred = RdmaCredsBuffer.AddCreds();
cred->SetAddress(reinterpret_cast<ui64>(memReg.GetAddr()));
cred->SetSize(memReg.GetSize());
cred->SetRkey(memReg.GetRKey(rdmaDeviceIndex));

Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The trailing whitespace on line 368 should be removed to maintain code cleanliness and consistency.

Suggested change

Copilot uses AI. Check for mistakes.
event.EventActuallySerialized += buf.GetSize();
PartLenRemain -= buf.GetSize();
}
}

if (checksum) {
*checksum = XXH3_64bits_digest(&state);
}
return true;
}

std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex) {
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
const NActorsInterconnect::TRdmaCreds& rdmaCreds = SendViaRdma->RdmaCreds;
ui32 checkSum = SendViaRdma->CheckSum;

ui16 credsSerializedSize = rdmaCreds.ByteSizeLong();
Y_ABORT_UNLESS(PartLenRemain == 0);

ui16 credsSerializedSize = RdmaCredsBuffer.ByteSizeLong();
// Part = | TChannelPart | EXdcCommand::RDMA_READ | rdmaCreds.Size | rdmaCreds | checkSum |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets rename to something like CumulativeChecksum to show what it is.

size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + credsSerializedSize + sizeof(ui32);
Y_ABORT_UNLESS(partSize < 4096);
Expand All @@ -396,20 +387,22 @@ namespace NActors {
ptr += sizeof(ui16);

ui32 payloadSz = 0;
for (const auto& rdmaCred : rdmaCreds.GetCreds()) {
for (const auto& rdmaCred : RdmaCredsBuffer.GetCreds()) {
payloadSz += rdmaCred.GetSize();
}

Y_ABORT_UNLESS(rdmaCreds.SerializePartialToArray(ptr, credsSerializedSize));
Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize));
ptr += credsSerializedSize;
WriteUnaligned<ui32>(ptr, checkSum);
OutputQueueSize -= event.EventSerializedSize;
WriteUnaligned<ui32>(ptr, checksumming ? XXH3_64bits_digest(&RdmaChecksumState) : 0);
OutputQueueSize -= payloadSz;

task.Write<false>(buffer, partSize);

task.AttachRdmaPayloadSize(payloadSz);

return true;
RdmaCredsBuffer.Clear();

return !Iter.Valid();
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value logic is incorrect. The function should return true when the current part is complete (i.e., when PartLenRemain == 0), not just when the entire buffer is consumed. When UseXdcShuffle is enabled with multiple RDMA-capable sections, a single section may be smaller than the entire buffer. The correct return should be !Iter.Valid() || PartLenRemain == 0 to indicate the part is complete either when all data is consumed OR when the current part's bytes are fully processed.

Suggested change
return !Iter.Valid();
return !Iter.Valid() || PartLenRemain == 0;

Copilot uses AI. Check for mistakes.
}

std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event) {
Expand Down
12 changes: 5 additions & 7 deletions ydb/library/actors/interconnect/interconnect_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,22 @@ namespace NActors {
TEventSerializationInfo SerializationInfoContainer;
const TEventSerializationInfo *SerializationInfo = nullptr;
bool IsPartInline = false;
bool IsPartRdma = false;
NActorsInterconnect::TRdmaCreds RdmaCredsBuffer;
size_t PartLenRemain = 0;
size_t SectionIndex = 0;
std::vector<char> XdcData;
std::shared_ptr<NInterconnect::NRdma::IMemPool> RdmaMemPool;
struct TRdmaSerializationArtifacts {
NActorsInterconnect::TRdmaCreds RdmaCreds;
ui32 CheckSum = 0;
};
std::optional<TRdmaSerializationArtifacts> SendViaRdma;
XXH3_state_t RdmaChecksumState;

template<bool External>
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
bool SerializeEventRdma(TEventHolder& event, NActorsInterconnect::TRdmaCreds& rdmaCreds, ui32* checkSum, ssize_t rdmaDeviceIndex);
bool SerializeEventRdma(TEventHolder& event);

bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event);
std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event);
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex);
std::optional<bool> FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming);

bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event);

Expand Down
21 changes: 13 additions & 8 deletions ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,18 +898,23 @@ namespace NActors {
for (const auto&& [data, size] : payload) {
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
}
} else if (pendingEvent.RdmaCheckSum) {
if (checksum != descr.Checksum) {
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
}
if (pendingEvent.RdmaCheckSum) {
XXH3_state_t state;
XXH3_64bits_reset(&state);
for (const auto&& [data, size] : payload) {
XXH3_64bits_update(&state, data, size);
for (auto iter = payload.Begin(); iter.Valid(); ++iter) {
auto memRegion = NInterconnect::NRdma::TryExtractFromRcBuf(iter.GetChunk());
if (!memRegion.Empty()) {
XXH3_64bits_update(&state, memRegion.GetAddr(), memRegion.GetSize());
}
}
checksum = XXH3_64bits_digest(&state);
}
ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum;
if (expectedChecksum) {
if (checksum != expectedChecksum) {
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
if (checksum != pendingEvent.RdmaCheckSum) {
LOG_CRIT_IC_SESSION("ICIS05", "event rdma checksum error Type# 0x%08" PRIx32, descr.Type);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ namespace NActors {
const ui32 gross = grossAfter - grossBefore;
channel->UnaccountedTraffic += gross;
const ui64 netAfter = channel->GetBufferedAmountOfData();
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore); // net amount should shrink
Y_DEBUG_ABORT_UNLESS(netAfter <= netBefore, "netBefore# %" PRIu64 " netAfter# %" PRIu64, netBefore, netAfter); // net amount should shrink
const ui64 net = netBefore - netAfter; // number of net bytes serialized

// adjust metrics for local and global queue size
Expand Down
Loading