Skip to content

Commit 0884404

Browse files
gridnevvvitydbot
authored andcommitted
quicker result rows release to avoid potential overload (#29362)
1 parent 3f8c347 commit 0884404

File tree

1 file changed

+81
-58
lines changed

1 file changed

+81
-58
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 81 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -560,17 +560,23 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
560560
row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType());
561561
ui64 joinKeyId = JoinKeySeqNo++;
562562
TOwnedCellVec cellVec(std::move(joinKeyCells));
563-
ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(std::move(row), rowSeqNo, firstRow, lastRow);
563+
auto [resIt, _] = ResultRowsBySeqNo.emplace(
564+
rowSeqNo, TResultBatch(KeepRowsOrder() ? nullptr : &FlushedResultRows, HolderFactory, rowSeqNo, std::move(row)));
565+
resIt->second.AcceptLeftRow(firstRow, lastRow);
564566
if (!IsKeyAllowed(cellVec)) {
565-
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(joinKeyId);
566-
ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished(HolderFactory, joinKeyId);
567+
resIt->second.AddJoinKey(joinKeyId);
568+
resIt->second.OnJoinKeyFinished(joinKeyId);
569+
if (resIt->second.Completed()) {
570+
ResultRowsBySeqNo.erase(resIt);
571+
}
572+
567573
} else {
568574
auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId));
569575
if (success) {
570576
UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec));
571577
}
572578
it->second.ResultSeqNos.push_back(rowSeqNo);
573-
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(it->second.JoinKeyId);
579+
resIt->second.AddJoinKey(it->second.JoinKeyId);
574580
}
575581
}
576582

@@ -752,7 +758,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
752758
auto it = PendingLeftRowsByKey.find(unprocessedRow.JoinKey);
753759
YQL_ENSURE(it != PendingLeftRowsByKey.end());
754760
for(ui64 seqNo : it->second.ResultSeqNos) {
755-
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, it->second.JoinKeyId);
761+
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
762+
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
763+
resultRowsIt->second.OnJoinKeyFinished(it->second.JoinKeyId);
764+
if (resultRowsIt->second.Completed()) {
765+
ResultRowsBySeqNo.erase(resultRowsIt);
766+
}
756767
}
757768
PendingLeftRowsByKey.erase(it);
758769
}
@@ -801,13 +812,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
801812
}
802813

803814
bool HasPendingResults() final {
804-
auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin();
805-
806-
if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) {
807-
return true;
808-
}
815+
OrderedFlushRows();
809816

810-
return false;
817+
return !FlushedResultRows.empty();
811818
}
812819

813820
void AddResult(TShardReadResult result) final {
@@ -834,8 +841,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
834841
auto rightRow = leftRowIt->second.AttachRightRow(HolderFactory, Settings, ReadColumns, row);
835842
for(auto seqNo: leftRowIt->second.ResultSeqNos) {
836843
auto& resultRows = ResultRowsBySeqNo.at(seqNo);
837-
resultRows.TryBuildResultRow(HolderFactory, rightRow);
838-
YQL_ENSURE(IsRowSeqNoValid(seqNo));
844+
resultRows.TryBuildResultRow(rightRow);
839845
}
840846
}
841847

@@ -849,7 +855,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
849855
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
850856
if (leftRowIt->second.FinishReadId(record.GetReadId())) {
851857
for(ui64 seqNo : leftRowIt->second.ResultSeqNos) {
852-
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, leftRowIt->second.JoinKeyId);
858+
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
859+
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
860+
resultRowsIt->second.OnJoinKeyFinished(leftRowIt->second.JoinKeyId);
861+
if (resultRowsIt->second.Completed()) {
862+
ResultRowsBySeqNo.erase(resultRowsIt);
863+
}
853864
}
854865

855866
PendingLeftRowsByKey.erase(leftRowIt);
@@ -866,7 +877,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
866877
&& UnprocessedKeys.empty()
867878
&& ReadStateByReadId.empty()
868879
&& ResultRowsBySeqNo.empty()
869-
&& PendingLeftRowsByKey.empty();
880+
&& PendingLeftRowsByKey.empty()
881+
&& FlushedResultRows.empty();
870882
}
871883

872884
void ResetRowsProcessing(ui64 readId) final {
@@ -907,45 +919,46 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
907919
ReadStateByReadId.erase(readIt);
908920
}
909921

910-
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
911-
TReadResultStats resultStats;
912-
batch.clear();
922+
void OrderedFlushRows() {
923+
if (!KeepRowsOrder()) {
924+
return;
925+
}
913926

914-
auto getNextResult = [&]() {
915-
if (!KeepRowsOrder()) {
916-
return ResultRowsBySeqNo.begin();
927+
while(true) {
928+
auto it = ResultRowsBySeqNo.find(CurrentResultSeqNo);
929+
if (it == ResultRowsBySeqNo.end()) {
930+
break;
917931
}
918932

919-
return ResultRowsBySeqNo.find(CurrentResultSeqNo);
920-
};
921-
922-
while (!resultStats.SizeLimitExceeded) {
923-
auto resultIt = getNextResult();
924-
if (resultIt == ResultRowsBySeqNo.end()) {
933+
it->second.FlushRows(FlushedResultRows);
934+
if (!it->second.Completed()) {
925935
break;
926936
}
927937

928-
auto& result = resultIt->second;
938+
++CurrentResultSeqNo;
939+
ResultRowsBySeqNo.erase(it);
940+
}
929941

930-
while(!result.Rows.empty()) {
931-
TResultBatch::TResultRow& row = result.Rows.front();
942+
return;
943+
}
932944

933-
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
934-
resultStats.SizeLimitExceeded = true;
935-
break;
936-
}
945+
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
946+
TReadResultStats resultStats;
947+
batch.clear();
937948

938-
batch.emplace_back(std::move(row.Data));
939-
result.Rows.pop_front();
940-
resultStats.Add(row.Stats);
941-
}
949+
OrderedFlushRows();
942950

943-
if (result.Completed()) {
944-
ResultRowsBySeqNo.erase(resultIt);
945-
++CurrentResultSeqNo;
946-
} else {
951+
while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty()) {
952+
TResultBatch::TResultRow& row = FlushedResultRows.front();
953+
954+
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
955+
resultStats.SizeLimitExceeded = true;
947956
break;
948957
}
958+
959+
batch.emplace_back(std::move(row.Data));
960+
FlushedResultRows.pop_front();
961+
resultStats.Add(row.Stats);
949962
}
950963

951964
return resultStats;
@@ -959,6 +972,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
959972
UnprocessedRows.clear();
960973
PendingLeftRowsByKey.clear();
961974
ResultRowsBySeqNo.clear();
975+
FlushedResultRows.clear();
962976
}
963977
private:
964978
struct TJoinKeyInfo {
@@ -1021,6 +1035,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10211035
TReadResultStats Stats;
10221036
};
10231037

1038+
explicit TResultBatch(std::deque<TResultRow>* result, const NMiniKQL::THolderFactory& holderFactory, ui64 seqNo, TSizedUnboxedValue&& leftRow)
1039+
: Result(result)
1040+
, HolderFactory(holderFactory)
1041+
, RowSeqNo(seqNo)
1042+
, LeftRow(std::move(leftRow))
1043+
{
1044+
}
1045+
1046+
std::deque<TResultRow>* Result;
1047+
const NMiniKQL::THolderFactory& HolderFactory;
10241048
std::deque<TResultRow> Rows;
10251049
std::unordered_set<ui64> PendingJoinKeys;
10261050
ui32 FirstUnprocessedRow = 0;
@@ -1036,21 +1060,19 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10361060
return LastRow && PendingJoinKeys.empty();
10371061
}
10381062

1039-
void OnJoinKeyFinished(const NMiniKQL::THolderFactory& HolderFactory, ui64 joinKeyId) {
1063+
void OnJoinKeyFinished(ui64 joinKeyId) {
10401064
auto it = PendingJoinKeys.find(joinKeyId);
10411065
if (it != PendingJoinKeys.end()) {
10421066
PendingJoinKeys.erase(joinKeyId);
1043-
TryBuildResultRow(HolderFactory);
1067+
TryBuildResultRow();
10441068
}
10451069
}
10461070

10471071
void AddJoinKey(ui64 joinKeyId) {
10481072
PendingJoinKeys.emplace(joinKeyId);
10491073
}
10501074

1051-
void AcceptLeftRow(TSizedUnboxedValue&& leftRow, ui64 seqNo, bool firstRow, bool lastRow) {
1052-
RowSeqNo = seqNo;
1053-
LeftRow = std::move(leftRow);
1075+
void AcceptLeftRow(bool firstRow, bool lastRow) {
10541076
if (firstRow)
10551077
FirstRow = true;
10561078

@@ -1062,7 +1084,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10621084
return Rows.empty() && FirstRow && LastRow && PendingJoinKeys.empty();
10631085
}
10641086

1065-
void TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, std::optional<TSizedUnboxedValue> row = {}) {
1087+
void FlushRows(std::deque<TResultRow>& externalStorage) {
1088+
externalStorage.insert(externalStorage.end(), Rows.begin(), Rows.end());
1089+
Rows.clear();
1090+
}
1091+
1092+
void TryBuildResultRow(std::optional<TSizedUnboxedValue> row = {}) {
10661093
if (RightRow.Data.HasValue() || ProcessedAllJoinKeys()) {
10671094
TReadResultStats rowStats;
10681095
NUdf::TUnboxedValue resultRow;
@@ -1082,7 +1109,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10821109
rowStats.ResultBytesCount += LeftRow.ComputeSize + RightRow.ComputeSize;
10831110

10841111
++ProcessedRows;
1085-
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
1112+
if (Result) {
1113+
Result->emplace_back(std::move(resultRow), std::move(rowStats));
1114+
} else {
1115+
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
1116+
}
10861117
}
10871118

10881119
if (row.has_value()) {
@@ -1099,15 +1130,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10991130
return Settings.KeepRowsOrder;
11001131
}
11011132

1102-
bool IsRowSeqNoValid(const ui64& seqNo) const {
1103-
if (!KeepRowsOrder()) {
1104-
return true;
1105-
}
1106-
1107-
// we should check row seqNo only if we need to keep the order
1108-
return seqNo >= CurrentResultSeqNo;
1109-
}
1110-
11111133
void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
11121134
auto& record = request->Record;
11131135

@@ -1198,6 +1220,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
11981220
std::unordered_map<ui64, TReadState> ReadStateByReadId;
11991221
absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
12001222
std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
1223+
std::deque<TResultBatch::TResultRow> FlushedResultRows;
12011224
ui64 InputRowSeqNo = 0;
12021225
ui64 InputRowSeqNoLast = 0;
12031226
ui64 JoinKeySeqNo = 0;

0 commit comments

Comments
 (0)