@@ -577,6 +577,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
577577 if (!IsKeyAllowed (cellVec)) {
578578 ResultRowsBySeqNo[rowSeqNo].AddJoinKey (joinKeyId);
579579 ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished (HolderFactory, joinKeyId);
580+ FlushRowsIfNeeded (rowSeqNo);
580581 } else {
581582 auto [it, success] = PendingLeftRowsByKey.emplace (cellVec, TJoinKeyInfo (joinKeyId));
582583 if (success) {
@@ -766,6 +767,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
766767 YQL_ENSURE (it != PendingLeftRowsByKey.end ());
767768 for (ui64 seqNo : it->second .ResultSeqNos ) {
768769 ResultRowsBySeqNo.at (seqNo).OnJoinKeyFinished (HolderFactory, it->second .JoinKeyId );
770+ FlushRowsIfNeeded (seqNo);
769771 }
770772 PendingLeftRowsByKey.erase (it);
771773 }
@@ -814,13 +816,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
814816 }
815817
816818 bool HasPendingResults () final {
817- auto nextSeqNo = KeepRowsOrder () ? ResultRowsBySeqNo. find (CurrentResultSeqNo) : ResultRowsBySeqNo. begin ();
819+ OrderedFlushRows ();
818820
819- if (nextSeqNo != ResultRowsBySeqNo.end () && !nextSeqNo->second .Rows .empty ()) {
820- return true ;
821- }
822-
823- return false ;
821+ return !FlushedResultRows.empty ();
824822 }
825823
826824 void AddResult (TShardReadResult result) final {
@@ -848,7 +846,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
848846 for (auto seqNo: leftRowIt->second .ResultSeqNos ) {
849847 auto & resultRows = ResultRowsBySeqNo.at (seqNo);
850848 resultRows.TryBuildResultRow (HolderFactory, rightRow);
851- YQL_ENSURE ( IsRowSeqNoValid ( seqNo) );
849+ FlushRowsIfNeeded ( seqNo);
852850 }
853851 }
854852
@@ -863,6 +861,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
863861 if (leftRowIt->second .FinishReadId (record.GetReadId ())) {
864862 for (ui64 seqNo : leftRowIt->second .ResultSeqNos ) {
865863 ResultRowsBySeqNo.at (seqNo).OnJoinKeyFinished (HolderFactory, leftRowIt->second .JoinKeyId );
864+ FlushRowsIfNeeded (seqNo);
866865 }
867866
868867 PendingLeftRowsByKey.erase (leftRowIt);
@@ -879,7 +878,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
879878 && UnprocessedKeys.empty ()
880879 && ReadStateByReadId.empty ()
881880 && ResultRowsBySeqNo.empty ()
882- && PendingLeftRowsByKey.empty ();
881+ && PendingLeftRowsByKey.empty ()
882+ && FlushedResultRows.empty ();
883883 }
884884
885885 void ResetRowsProcessing (ui64 readId) final {
@@ -920,45 +920,37 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
920920 ReadStateByReadId.erase (readIt);
921921 }
922922
923- TReadResultStats ReplyResult (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
924- TReadResultStats resultStats;
925- batch.clear ();
926-
927- auto getNextResult = [&]() {
928- if (!KeepRowsOrder ()) {
929- return ResultRowsBySeqNo.begin ();
930- }
931-
932- return ResultRowsBySeqNo.find (CurrentResultSeqNo);
933- };
923+ void OrderedFlushRows () {
924+ if (!KeepRowsOrder ()) {
925+ return ;
926+ }
934927
935- while (!resultStats.SizeLimitExceeded ) {
936- auto resultIt = getNextResult ();
937- if (resultIt == ResultRowsBySeqNo.end ()) {
928+ while (true ) {
929+ ui64 currentSeqNo = CurrentResultSeqNo;
930+ FlushRowsIfNeeded (currentSeqNo);
931+ if (currentSeqNo == CurrentResultSeqNo) {
938932 break ;
939933 }
934+ }
935+ }
940936
941- auto & result = resultIt->second ;
942-
943- while (!result.Rows .empty ()) {
944- TResultBatch::TResultRow& row = result.Rows .front ();
937+ TReadResultStats ReplyResult (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
938+ TReadResultStats resultStats;
939+ batch.clear ();
945940
946- if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats .ResultBytesCount > (ui64)freeSpace) {
947- resultStats.SizeLimitExceeded = true ;
948- break ;
949- }
941+ OrderedFlushRows ();
950942
951- batch.emplace_back (std::move (row.Data ));
952- result.Rows .pop_front ();
953- resultStats.Add (row.Stats );
954- }
943+ while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty ()) {
944+ TResultBatch::TResultRow& row = FlushedResultRows.front ();
955945
956- if (result.Completed ()) {
957- ResultRowsBySeqNo.erase (resultIt);
958- ++CurrentResultSeqNo;
959- } else {
946+ if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats .ResultBytesCount > (ui64)freeSpace) {
947+ resultStats.SizeLimitExceeded = true ;
960948 break ;
961949 }
950+
951+ batch.emplace_back (std::move (row.Data ));
952+ FlushedResultRows.pop_front ();
953+ resultStats.Add (row.Stats );
962954 }
963955
964956 return resultStats;
@@ -972,6 +964,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
972964 UnprocessedRows.clear ();
973965 PendingLeftRowsByKey.clear ();
974966 ResultRowsBySeqNo.clear ();
967+ FlushedResultRows.clear ();
975968 }
976969private:
977970 struct TJoinKeyInfo {
@@ -1075,6 +1068,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10751068 return Rows.empty () && FirstRow && LastRow && PendingJoinKeys.empty ();
10761069 }
10771070
1071+ void FlushRows (std::deque<TResultRow>& externalStorage) {
1072+ externalStorage.insert (externalStorage.end (), Rows.begin (), Rows.end ());
1073+ Rows.clear ();
1074+ }
1075+
10781076 void TryBuildResultRow (const NMiniKQL::THolderFactory& HolderFactory, std::optional<TSizedUnboxedValue> row = {}) {
10791077 if (RightRow.Data .HasValue () || ProcessedAllJoinKeys ()) {
10801078 TReadResultStats rowStats;
@@ -1112,13 +1110,22 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
11121110 return Settings.KeepRowsOrder ;
11131111 }
11141112
1115- bool IsRowSeqNoValid (const ui64& seqNo) const {
1116- if (!KeepRowsOrder ()) {
1117- return true ;
1113+ void FlushRowsIfNeeded (const ui64& seqNo) {
1114+ if (KeepRowsOrder ()) {
1115+ YQL_ENSURE (seqNo >= CurrentResultSeqNo);
1116+ if (seqNo != CurrentResultSeqNo) {
1117+ return ;
1118+ }
11181119 }
11191120
1120- // we should check row seqNo only if we need to keep the order
1121- return seqNo >= CurrentResultSeqNo;
1121+ auto it = ResultRowsBySeqNo.find (seqNo);
1122+ YQL_ENSURE (it != ResultRowsBySeqNo.end ());
1123+ auto & result = it->second ;
1124+ result.FlushRows (FlushedResultRows);
1125+ if (result.Completed ()) {
1126+ ResultRowsBySeqNo.erase (it);
1127+ ++CurrentResultSeqNo;
1128+ }
11221129 }
11231130
11241131 void FillReadRequest (ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
@@ -1211,6 +1218,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
12111218 std::unordered_map<ui64, TReadState> ReadStateByReadId;
12121219 absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
12131220 std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
1221+ std::deque<TResultBatch::TResultRow> FlushedResultRows;
12141222 ui64 InputRowSeqNo = 0 ;
12151223 ui64 InputRowSeqNoLast = 0 ;
12161224 ui64 JoinKeySeqNo = 0 ;
0 commit comments