Skip to content

Commit ade9f81

Browse files
committed
Add checksum support
1 parent c779a59 commit ade9f81

File tree

3 files changed

+19
-19
lines changed

3 files changed

+19
-19
lines changed

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,14 @@ namespace NActors {
104104
} else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
105105
State = EState::SECTIONS;
106106
SectionIndex = 0;
107+
XXH3_64bits_reset(&RdmaChecksumState);
107108

108109
bool sendViaRdma = false;
109110
// Check if any section can be send via rdma
110111
for (const auto& section : SerializationInfo->Sections) {
111112
sendViaRdma |= section.IsRdmaCapable;
112113
}
113114
if (sendViaRdma && Params.UseRdma && RdmaMemPool) {
114-
NActorsInterconnect::TRdmaCreds rdmaCreds;
115115
if (SerializeEventRdma(event)) {
116116
Chunker.DiscardEvent();
117117
}
@@ -337,11 +337,6 @@ namespace NActors {
337337
std::optional<bool> TEventOutputChannel::FeedRdmaPayload(TTcpPacketOutTask& task, TEventHolder& event, ssize_t rdmaDeviceIndex, bool checksumming) {
338338
Y_ABORT_UNLESS(rdmaDeviceIndex >= 0);
339339

340-
XXH3_state_t state;
341-
if (checksumming) {
342-
XXH3_64bits_reset(&state);
343-
}
344-
345340
Y_ABORT_UNLESS(event.Buffer);
346341
if (RdmaCredsBuffer.CredsSize() == 0) {
347342
auto prevIter = Iter;
@@ -357,7 +352,7 @@ namespace NActors {
357352
return false;
358353
}
359354
if (checksumming) {
360-
XXH3_64bits_update(&state, buf.GetData(), buf.GetSize());
355+
XXH3_64bits_update(&RdmaChecksumState, buf.GetData(), buf.GetSize());
361356
}
362357
auto cred = RdmaCredsBuffer.AddCreds();
363358
cred->SetAddress(reinterpret_cast<ui64>(memReg.GetAddr()));
@@ -398,7 +393,7 @@ namespace NActors {
398393

399394
Y_ABORT_UNLESS(RdmaCredsBuffer.SerializePartialToArray(ptr, credsSerializedSize));
400395
ptr += credsSerializedSize;
401-
WriteUnaligned<ui32>(ptr, checksumming ? XXH3_64bits_digest(&state) : 0);
396+
WriteUnaligned<ui32>(ptr, checksumming ? XXH3_64bits_digest(&RdmaChecksumState) : 0);
402397
OutputQueueSize -= payloadSz;
403398

404399
task.Write<false>(buffer, partSize);

ydb/library/actors/interconnect/interconnect_channel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ namespace NActors {
152152
size_t SectionIndex = 0;
153153
std::vector<char> XdcData;
154154
std::shared_ptr<NInterconnect::NRdma::IMemPool> RdmaMemPool;
155+
XXH3_state_t RdmaChecksumState;
155156

156157
template<bool External>
157158
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);

ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -898,21 +898,25 @@ namespace NActors {
898898
for (const auto&& [data, size] : payload) {
899899
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
900900
}
901-
} else if (pendingEvent.RdmaCheckSum) {
901+
if (checksum != descr.Checksum) {
902+
LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
903+
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
904+
}
905+
}
906+
if (pendingEvent.RdmaCheckSum) {
902907
XXH3_state_t state;
903908
XXH3_64bits_reset(&state);
904-
for (const auto&& [data, size] : payload) {
905-
XXH3_64bits_update(&state, data, size);
909+
for (auto iter = payload.Begin(); iter.Valid(); ++iter) {
910+
auto memRegion = NInterconnect::NRdma::TryExtractFromRcBuf(iter.GetChunk());
911+
if (!memRegion.Empty()) {
912+
XXH3_64bits_update(&state, memRegion.GetAddr(), memRegion.GetSize());
913+
}
906914
}
907915
checksum = XXH3_64bits_digest(&state);
908-
}
909-
ui32 expectedChecksum = descr.Checksum ?: pendingEvent.RdmaCheckSum;
910-
if (expectedChecksum) {
911-
Y_UNUSED(checksum, expectedChecksum);
912-
// if (checksum != expectedChecksum) {
913-
// LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
914-
// throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
915-
// }
916+
if (checksum != pendingEvent.RdmaCheckSum) {
917+
LOG_CRIT_IC_SESSION("ICIS05", "event rdma checksum error Type# 0x%08" PRIx32, descr.Type);
918+
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
919+
}
916920
}
917921
pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat;
918922

0 commit comments

Comments
 (0)