Skip to content

Commit b52dcd2

Browse files
authored
quicker result rows release to avoid potential overload (#29362)
1 parent 2c39db9 commit b52dcd2

File tree

1 file changed

+81
-58
lines changed

1 file changed

+81
-58
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 81 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -572,17 +572,23 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
572572
row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType());
573573
ui64 joinKeyId = JoinKeySeqNo++;
574574
TOwnedCellVec cellVec(std::move(joinKeyCells));
575-
ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(std::move(row), rowSeqNo, firstRow, lastRow);
575+
auto [resIt, _] = ResultRowsBySeqNo.emplace(
576+
rowSeqNo, TResultBatch(KeepRowsOrder() ? nullptr : &FlushedResultRows, HolderFactory, rowSeqNo, std::move(row)));
577+
resIt->second.AcceptLeftRow(firstRow, lastRow);
576578
if (!IsKeyAllowed(cellVec)) {
577-
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(joinKeyId);
578-
ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished(HolderFactory, joinKeyId);
579+
resIt->second.AddJoinKey(joinKeyId);
580+
resIt->second.OnJoinKeyFinished(joinKeyId);
581+
if (resIt->second.Completed()) {
582+
ResultRowsBySeqNo.erase(resIt);
583+
}
584+
579585
} else {
580586
auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId));
581587
if (success) {
582588
UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec));
583589
}
584590
it->second.ResultSeqNos.push_back(rowSeqNo);
585-
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(it->second.JoinKeyId);
591+
resIt->second.AddJoinKey(it->second.JoinKeyId);
586592
}
587593
}
588594

@@ -764,7 +770,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
764770
auto it = PendingLeftRowsByKey.find(unprocessedRow.JoinKey);
765771
YQL_ENSURE(it != PendingLeftRowsByKey.end());
766772
for(ui64 seqNo : it->second.ResultSeqNos) {
767-
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, it->second.JoinKeyId);
773+
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
774+
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
775+
resultRowsIt->second.OnJoinKeyFinished(it->second.JoinKeyId);
776+
if (resultRowsIt->second.Completed()) {
777+
ResultRowsBySeqNo.erase(resultRowsIt);
778+
}
768779
}
769780
PendingLeftRowsByKey.erase(it);
770781
}
@@ -813,13 +824,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
813824
}
814825

815826
bool HasPendingResults() final {
816-
auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin();
817-
818-
if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) {
819-
return true;
820-
}
827+
OrderedFlushRows();
821828

822-
return false;
829+
return !FlushedResultRows.empty();
823830
}
824831

825832
void AddResult(TShardReadResult result) final {
@@ -846,8 +853,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
846853
auto rightRow = leftRowIt->second.AttachRightRow(HolderFactory, Settings, ReadColumns, row);
847854
for(auto seqNo: leftRowIt->second.ResultSeqNos) {
848855
auto& resultRows = ResultRowsBySeqNo.at(seqNo);
849-
resultRows.TryBuildResultRow(HolderFactory, rightRow);
850-
YQL_ENSURE(IsRowSeqNoValid(seqNo));
856+
resultRows.TryBuildResultRow(rightRow);
851857
}
852858
}
853859

@@ -861,7 +867,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
861867
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
862868
if (leftRowIt->second.FinishReadId(record.GetReadId())) {
863869
for(ui64 seqNo : leftRowIt->second.ResultSeqNos) {
864-
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, leftRowIt->second.JoinKeyId);
870+
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
871+
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
872+
resultRowsIt->second.OnJoinKeyFinished(leftRowIt->second.JoinKeyId);
873+
if (resultRowsIt->second.Completed()) {
874+
ResultRowsBySeqNo.erase(resultRowsIt);
875+
}
865876
}
866877

867878
PendingLeftRowsByKey.erase(leftRowIt);
@@ -878,7 +889,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
878889
&& UnprocessedKeys.empty()
879890
&& ReadStateByReadId.empty()
880891
&& ResultRowsBySeqNo.empty()
881-
&& PendingLeftRowsByKey.empty();
892+
&& PendingLeftRowsByKey.empty()
893+
&& FlushedResultRows.empty();
882894
}
883895

884896
void ResetRowsProcessing(ui64 readId) final {
@@ -919,45 +931,46 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
919931
ReadStateByReadId.erase(readIt);
920932
}
921933

922-
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
923-
TReadResultStats resultStats;
924-
batch.clear();
934+
void OrderedFlushRows() {
935+
if (!KeepRowsOrder()) {
936+
return;
937+
}
925938

926-
auto getNextResult = [&]() {
927-
if (!KeepRowsOrder()) {
928-
return ResultRowsBySeqNo.begin();
939+
while(true) {
940+
auto it = ResultRowsBySeqNo.find(CurrentResultSeqNo);
941+
if (it == ResultRowsBySeqNo.end()) {
942+
break;
929943
}
930944

931-
return ResultRowsBySeqNo.find(CurrentResultSeqNo);
932-
};
933-
934-
while (!resultStats.SizeLimitExceeded) {
935-
auto resultIt = getNextResult();
936-
if (resultIt == ResultRowsBySeqNo.end()) {
945+
it->second.FlushRows(FlushedResultRows);
946+
if (!it->second.Completed()) {
937947
break;
938948
}
939949

940-
auto& result = resultIt->second;
950+
++CurrentResultSeqNo;
951+
ResultRowsBySeqNo.erase(it);
952+
}
941953

942-
while(!result.Rows.empty()) {
943-
TResultBatch::TResultRow& row = result.Rows.front();
954+
return;
955+
}
944956

945-
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
946-
resultStats.SizeLimitExceeded = true;
947-
break;
948-
}
957+
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
958+
TReadResultStats resultStats;
959+
batch.clear();
949960

950-
batch.emplace_back(std::move(row.Data));
951-
result.Rows.pop_front();
952-
resultStats.Add(row.Stats);
953-
}
961+
OrderedFlushRows();
954962

955-
if (result.Completed()) {
956-
ResultRowsBySeqNo.erase(resultIt);
957-
++CurrentResultSeqNo;
958-
} else {
963+
while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty()) {
964+
TResultBatch::TResultRow& row = FlushedResultRows.front();
965+
966+
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
967+
resultStats.SizeLimitExceeded = true;
959968
break;
960969
}
970+
971+
batch.emplace_back(std::move(row.Data));
972+
FlushedResultRows.pop_front();
973+
resultStats.Add(row.Stats);
961974
}
962975

963976
return resultStats;
@@ -971,6 +984,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
971984
UnprocessedRows.clear();
972985
PendingLeftRowsByKey.clear();
973986
ResultRowsBySeqNo.clear();
987+
FlushedResultRows.clear();
974988
}
975989
private:
976990
struct TJoinKeyInfo {
@@ -1033,6 +1047,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10331047
TReadResultStats Stats;
10341048
};
10351049

1050+
explicit TResultBatch(std::deque<TResultRow>* result, const NMiniKQL::THolderFactory& holderFactory, ui64 seqNo, TSizedUnboxedValue&& leftRow)
1051+
: Result(result)
1052+
, HolderFactory(holderFactory)
1053+
, RowSeqNo(seqNo)
1054+
, LeftRow(std::move(leftRow))
1055+
{
1056+
}
1057+
1058+
std::deque<TResultRow>* Result;
1059+
const NMiniKQL::THolderFactory& HolderFactory;
10361060
std::deque<TResultRow> Rows;
10371061
std::unordered_set<ui64> PendingJoinKeys;
10381062
ui32 FirstUnprocessedRow = 0;
@@ -1048,21 +1072,19 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10481072
return LastRow && PendingJoinKeys.empty();
10491073
}
10501074

1051-
void OnJoinKeyFinished(const NMiniKQL::THolderFactory& HolderFactory, ui64 joinKeyId) {
1075+
void OnJoinKeyFinished(ui64 joinKeyId) {
10521076
auto it = PendingJoinKeys.find(joinKeyId);
10531077
if (it != PendingJoinKeys.end()) {
10541078
PendingJoinKeys.erase(joinKeyId);
1055-
TryBuildResultRow(HolderFactory);
1079+
TryBuildResultRow();
10561080
}
10571081
}
10581082

10591083
void AddJoinKey(ui64 joinKeyId) {
10601084
PendingJoinKeys.emplace(joinKeyId);
10611085
}
10621086

1063-
void AcceptLeftRow(TSizedUnboxedValue&& leftRow, ui64 seqNo, bool firstRow, bool lastRow) {
1064-
RowSeqNo = seqNo;
1065-
LeftRow = std::move(leftRow);
1087+
void AcceptLeftRow(bool firstRow, bool lastRow) {
10661088
if (firstRow)
10671089
FirstRow = true;
10681090

@@ -1074,7 +1096,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10741096
return Rows.empty() && FirstRow && LastRow && PendingJoinKeys.empty();
10751097
}
10761098

1077-
void TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, std::optional<TSizedUnboxedValue> row = {}) {
1099+
void FlushRows(std::deque<TResultRow>& externalStorage) {
1100+
externalStorage.insert(externalStorage.end(), Rows.begin(), Rows.end());
1101+
Rows.clear();
1102+
}
1103+
1104+
void TryBuildResultRow(std::optional<TSizedUnboxedValue> row = {}) {
10781105
if (RightRow.Data.HasValue() || ProcessedAllJoinKeys()) {
10791106
TReadResultStats rowStats;
10801107
NUdf::TUnboxedValue resultRow;
@@ -1094,7 +1121,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10941121
rowStats.ResultBytesCount += LeftRow.ComputeSize + RightRow.ComputeSize;
10951122

10961123
++ProcessedRows;
1097-
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
1124+
if (Result) {
1125+
Result->emplace_back(std::move(resultRow), std::move(rowStats));
1126+
} else {
1127+
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
1128+
}
10981129
}
10991130

11001131
if (row.has_value()) {
@@ -1111,15 +1142,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
11111142
return Settings.KeepRowsOrder;
11121143
}
11131144

1114-
bool IsRowSeqNoValid(const ui64& seqNo) const {
1115-
if (!KeepRowsOrder()) {
1116-
return true;
1117-
}
1118-
1119-
// we should check row seqNo only if we need to keep the order
1120-
return seqNo >= CurrentResultSeqNo;
1121-
}
1122-
11231145
void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
11241146
auto& record = request->Record;
11251147

@@ -1210,6 +1232,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
12101232
std::unordered_map<ui64, TReadState> ReadStateByReadId;
12111233
absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
12121234
std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
1235+
std::deque<TResultBatch::TResultRow> FlushedResultRows;
12131236
ui64 InputRowSeqNo = 0;
12141237
ui64 InputRowSeqNoLast = 0;
12151238
ui64 JoinKeySeqNo = 0;

0 commit comments

Comments
 (0)