From 0a0ff045b4befa200fb70dca317e3221067d840f Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Sat, 22 Nov 2025 02:51:37 +0300 Subject: [PATCH] quicker result rows to avoid potential overload --- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 139 ++++++++++-------- 1 file changed, 81 insertions(+), 58 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index d50dd4e78c33..462e3d0e5e95 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -572,17 +572,23 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType()); ui64 joinKeyId = JoinKeySeqNo++; TOwnedCellVec cellVec(std::move(joinKeyCells)); - ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(std::move(row), rowSeqNo, firstRow, lastRow); + auto [resIt, _] = ResultRowsBySeqNo.emplace( + rowSeqNo, TResultBatch(KeepRowsOrder() ? nullptr : &FlushedResultRows, HolderFactory, rowSeqNo, std::move(row))); + resIt->second.AcceptLeftRow(firstRow, lastRow); if (!IsKeyAllowed(cellVec)) { - ResultRowsBySeqNo[rowSeqNo].AddJoinKey(joinKeyId); - ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished(HolderFactory, joinKeyId); + resIt->second.AddJoinKey(joinKeyId); + resIt->second.OnJoinKeyFinished(joinKeyId); + if (resIt->second.Completed()) { + ResultRowsBySeqNo.erase(resIt); + } + } else { auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId)); if (success) { UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec)); } it->second.ResultSeqNos.push_back(rowSeqNo); - ResultRowsBySeqNo[rowSeqNo].AddJoinKey(it->second.JoinKeyId); + resIt->second.AddJoinKey(it->second.JoinKeyId); } } @@ -764,7 +770,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { auto it = PendingLeftRowsByKey.find(unprocessedRow.JoinKey); YQL_ENSURE(it != PendingLeftRowsByKey.end()); for(ui64 seqNo : it->second.ResultSeqNos) { - ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, it->second.JoinKeyId); + auto resultRowsIt = ResultRowsBySeqNo.find(seqNo); + YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end()); + resultRowsIt->second.OnJoinKeyFinished(it->second.JoinKeyId); + if (resultRowsIt->second.Completed()) { + ResultRowsBySeqNo.erase(resultRowsIt); + } } PendingLeftRowsByKey.erase(it); } @@ -813,13 +824,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { } bool HasPendingResults() final { - auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin(); - - if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) { - return true; - } + OrderedFlushRows(); - return false; + return !FlushedResultRows.empty(); } void AddResult(TShardReadResult result) final { @@ -846,8 +853,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { auto rightRow = leftRowIt->second.AttachRightRow(HolderFactory, Settings, ReadColumns, row); for(auto seqNo: leftRowIt->second.ResultSeqNos) { auto& resultRows = ResultRowsBySeqNo.at(seqNo); - resultRows.TryBuildResultRow(HolderFactory, rightRow); - YQL_ENSURE(IsRowSeqNoValid(seqNo)); + resultRows.TryBuildResultRow(rightRow); } } @@ -861,7 +867,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); if (leftRowIt->second.FinishReadId(record.GetReadId())) { for(ui64 seqNo : leftRowIt->second.ResultSeqNos) { - ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, leftRowIt->second.JoinKeyId); + auto resultRowsIt = ResultRowsBySeqNo.find(seqNo); + YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end()); + resultRowsIt->second.OnJoinKeyFinished(leftRowIt->second.JoinKeyId); + if (resultRowsIt->second.Completed()) { + ResultRowsBySeqNo.erase(resultRowsIt); + } } PendingLeftRowsByKey.erase(leftRowIt); @@ -878,7 +889,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { && UnprocessedKeys.empty() && ReadStateByReadId.empty() && ResultRowsBySeqNo.empty() - && PendingLeftRowsByKey.empty(); + && PendingLeftRowsByKey.empty() + && FlushedResultRows.empty(); } void ResetRowsProcessing(ui64 readId) final { @@ -919,45 +931,46 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { ReadStateByReadId.erase(readIt); } - TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { - TReadResultStats resultStats; - batch.clear(); + void OrderedFlushRows() { + if (!KeepRowsOrder()) { + return; + } - auto getNextResult = [&]() { - if (!KeepRowsOrder()) { - return ResultRowsBySeqNo.begin(); + while(true) { + auto it = ResultRowsBySeqNo.find(CurrentResultSeqNo); + if (it == ResultRowsBySeqNo.end()) { + break; } - return ResultRowsBySeqNo.find(CurrentResultSeqNo); - }; - - while (!resultStats.SizeLimitExceeded) { - auto resultIt = getNextResult(); - if (resultIt == ResultRowsBySeqNo.end()) { + it->second.FlushRows(FlushedResultRows); + if (!it->second.Completed()) { break; } - auto& result = resultIt->second; + ++CurrentResultSeqNo; + ResultRowsBySeqNo.erase(it); + } - while(!result.Rows.empty()) { - TResultBatch::TResultRow& row = result.Rows.front(); + return; + } - if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { - resultStats.SizeLimitExceeded = true; - break; - } + TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { + TReadResultStats resultStats; + batch.clear(); - batch.emplace_back(std::move(row.Data)); - result.Rows.pop_front(); - resultStats.Add(row.Stats); - } + OrderedFlushRows(); - if (result.Completed()) { - ResultRowsBySeqNo.erase(resultIt); - ++CurrentResultSeqNo; - } else { + while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty()) { + TResultBatch::TResultRow& row = FlushedResultRows.front(); + + if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) { + resultStats.SizeLimitExceeded = true; break; } + + batch.emplace_back(std::move(row.Data)); + FlushedResultRows.pop_front(); + resultStats.Add(row.Stats); } return resultStats; @@ -971,6 +984,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { UnprocessedRows.clear(); PendingLeftRowsByKey.clear(); ResultRowsBySeqNo.clear(); + FlushedResultRows.clear(); } private: struct TJoinKeyInfo { @@ -1033,6 +1047,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { TReadResultStats Stats; }; + explicit TResultBatch(std::deque* result, const NMiniKQL::THolderFactory& holderFactory, ui64 seqNo, TSizedUnboxedValue&& leftRow) + : Result(result) + , HolderFactory(holderFactory) + , RowSeqNo(seqNo) + , LeftRow(std::move(leftRow)) + { + } + + std::deque* Result; + const NMiniKQL::THolderFactory& HolderFactory; std::deque Rows; std::unordered_set PendingJoinKeys; ui32 FirstUnprocessedRow = 0; @@ -1048,11 +1072,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { return LastRow && PendingJoinKeys.empty(); } - void OnJoinKeyFinished(const NMiniKQL::THolderFactory& HolderFactory, ui64 joinKeyId) { + void OnJoinKeyFinished(ui64 joinKeyId) { auto it = PendingJoinKeys.find(joinKeyId); if (it != PendingJoinKeys.end()) { PendingJoinKeys.erase(joinKeyId); - TryBuildResultRow(HolderFactory); + TryBuildResultRow(); } } @@ -1060,9 +1084,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { PendingJoinKeys.emplace(joinKeyId); } - void AcceptLeftRow(TSizedUnboxedValue&& leftRow, ui64 seqNo, bool firstRow, bool lastRow) { - RowSeqNo = seqNo; - LeftRow = std::move(leftRow); + void AcceptLeftRow(bool firstRow, bool lastRow) { if (firstRow) FirstRow = true; @@ -1074,7 +1096,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { return Rows.empty() && FirstRow && LastRow && PendingJoinKeys.empty(); } - void TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, std::optional row = {}) { + void FlushRows(std::deque& externalStorage) { + externalStorage.insert(externalStorage.end(), Rows.begin(), Rows.end()); + Rows.clear(); + } + + void TryBuildResultRow(std::optional row = {}) { if (RightRow.Data.HasValue() || ProcessedAllJoinKeys()) { TReadResultStats rowStats; NUdf::TUnboxedValue resultRow; @@ -1094,7 +1121,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { rowStats.ResultBytesCount += LeftRow.ComputeSize + RightRow.ComputeSize; ++ProcessedRows; - Rows.emplace_back(std::move(resultRow), std::move(rowStats)); + if (Result) { + Result->emplace_back(std::move(resultRow), std::move(rowStats)); + } else { + Rows.emplace_back(std::move(resultRow), std::move(rowStats)); + } } if (row.has_value()) { @@ -1111,15 +1142,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { return Settings.KeepRowsOrder; } - bool IsRowSeqNoValid(const ui64& seqNo) const { - if (!KeepRowsOrder()) { - return true; - } - - // we should check row seqNo only if we need to keep the order - return seqNo >= CurrentResultSeqNo; - } - void FillReadRequest(ui64 readId, THolder& request, const std::vector& ranges) { auto& record = request->Record; @@ -1210,6 +1232,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { std::unordered_map ReadStateByReadId; absl::flat_hash_map PendingLeftRowsByKey; std::unordered_map ResultRowsBySeqNo; + std::deque FlushedResultRows; ui64 InputRowSeqNo = 0; ui64 InputRowSeqNoLast = 0; ui64 JoinKeySeqNo = 0;