@@ -576,6 +576,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
576576 if (!IsKeyAllowed (cellVec)) {
577577 ResultRowsBySeqNo[rowSeqNo].AddJoinKey (joinKeyId);
578578 ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished (HolderFactory, joinKeyId);
579+ FlushRowsIfNeeded (rowSeqNo);
579580 } else {
580581 auto [it, success] = PendingLeftRowsByKey.emplace (cellVec, TJoinKeyInfo (joinKeyId));
581582 if (success) {
@@ -765,6 +766,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
765766 YQL_ENSURE (it != PendingLeftRowsByKey.end ());
766767 for (ui64 seqNo : it->second .ResultSeqNos ) {
767768 ResultRowsBySeqNo.at (seqNo).OnJoinKeyFinished (HolderFactory, it->second .JoinKeyId );
769+ FlushRowsIfNeeded (seqNo);
768770 }
769771 PendingLeftRowsByKey.erase (it);
770772 }
@@ -813,13 +815,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
813815 }
814816
815817 bool HasPendingResults () final {
816- auto nextSeqNo = KeepRowsOrder () ? ResultRowsBySeqNo. find (CurrentResultSeqNo) : ResultRowsBySeqNo. begin ();
818+ OrderedFlushRows ();
817819
818- if (nextSeqNo != ResultRowsBySeqNo.end () && !nextSeqNo->second .Rows .empty ()) {
819- return true ;
820- }
821-
822- return false ;
820+ return !FlushedResultRows.empty ();
823821 }
824822
825823 void AddResult (TShardReadResult result) final {
@@ -847,7 +845,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
847845 for (auto seqNo: leftRowIt->second .ResultSeqNos ) {
848846 auto & resultRows = ResultRowsBySeqNo.at (seqNo);
849847 resultRows.TryBuildResultRow (HolderFactory, rightRow);
850- YQL_ENSURE ( IsRowSeqNoValid ( seqNo) );
848+ FlushRowsIfNeeded ( seqNo);
851849 }
852850 }
853851
@@ -862,6 +860,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
862860 if (leftRowIt->second .FinishReadId (record.GetReadId ())) {
863861 for (ui64 seqNo : leftRowIt->second .ResultSeqNos ) {
864862 ResultRowsBySeqNo.at (seqNo).OnJoinKeyFinished (HolderFactory, leftRowIt->second .JoinKeyId );
863+ FlushRowsIfNeeded (seqNo);
865864 }
866865
867866 PendingLeftRowsByKey.erase (leftRowIt);
@@ -878,7 +877,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
878877 && UnprocessedKeys.empty ()
879878 && ReadStateByReadId.empty ()
880879 && ResultRowsBySeqNo.empty ()
881- && PendingLeftRowsByKey.empty ();
880+ && PendingLeftRowsByKey.empty ()
881+ && FlushedResultRows.empty ();
882882 }
883883
884884 void ResetRowsProcessing (ui64 readId) final {
@@ -919,45 +919,38 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
919919 ReadStateByReadId.erase (readIt);
920920 }
921921
922- TReadResultStats ReplyResult (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
923- TReadResultStats resultStats;
924- batch.clear ();
925-
926- auto getNextResult = [&]() {
927- if (!KeepRowsOrder ()) {
928- return ResultRowsBySeqNo.begin ();
929- }
930-
931- return ResultRowsBySeqNo.find (CurrentResultSeqNo);
932- };
922+ void OrderedFlushRows () {
923+ if (!KeepRowsOrder ()) {
924+ return ;
925+ }
933926
934- while (!resultStats. SizeLimitExceeded ) {
935- auto resultIt = getNextResult ( );
936- if (resultIt == ResultRowsBySeqNo.end ()) {
927+ for (ui64 currentSeqNo = CurrentResultSeqNo; currentSeqNo == CurrentResultSeqNo; ++currentSeqNo ) {
928+ auto it = ResultRowsBySeqNo. find (currentSeqNo );
929+ if (it == ResultRowsBySeqNo.end ()) {
937930 break ;
938931 }
939932
940- auto & result = resultIt->second ;
933+ FlushRowsIfNeeded (currentSeqNo);
934+ }
935+ }
941936
942- while (!result.Rows .empty ()) {
943- TResultBatch::TResultRow& row = result.Rows .front ();
937+ TReadResultStats ReplyResult (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
938+ TReadResultStats resultStats;
939+ batch.clear ();
944940
945- if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats .ResultBytesCount > (ui64)freeSpace) {
946- resultStats.SizeLimitExceeded = true ;
947- break ;
948- }
941+ OrderedFlushRows ();
949942
950- batch.emplace_back (std::move (row.Data ));
951- result.Rows .pop_front ();
952- resultStats.Add (row.Stats );
953- }
943+ while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty ()) {
944+ TResultBatch::TResultRow& row = FlushedResultRows.front ();
954945
955- if (result.Completed ()) {
956- ResultRowsBySeqNo.erase (resultIt);
957- ++CurrentResultSeqNo;
958- } else {
946+ if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats .ResultBytesCount > (ui64)freeSpace) {
947+ resultStats.SizeLimitExceeded = true ;
959948 break ;
960949 }
950+
951+ batch.emplace_back (std::move (row.Data ));
952+ FlushedResultRows.pop_front ();
953+ resultStats.Add (row.Stats );
961954 }
962955
963956 return resultStats;
@@ -971,6 +964,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
971964 UnprocessedRows.clear ();
972965 PendingLeftRowsByKey.clear ();
973966 ResultRowsBySeqNo.clear ();
967+ FlushedResultRows.clear ();
974968 }
975969private:
976970 struct TJoinKeyInfo {
@@ -1074,6 +1068,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10741068 return Rows.empty () && FirstRow && LastRow && PendingJoinKeys.empty ();
10751069 }
10761070
1071+ void FlushRows (std::deque<TResultRow>& externalStorage) {
1072+ externalStorage.insert (externalStorage.end (), Rows.begin (), Rows.end ());
1073+ Rows.clear ();
1074+ }
1075+
10771076 void TryBuildResultRow (const NMiniKQL::THolderFactory& HolderFactory, std::optional<TSizedUnboxedValue> row = {}) {
10781077 if (RightRow.Data .HasValue () || ProcessedAllJoinKeys ()) {
10791078 TReadResultStats rowStats;
@@ -1111,13 +1110,22 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
11111110 return Settings.KeepRowsOrder ;
11121111 }
11131112
1114- bool IsRowSeqNoValid (const ui64& seqNo) const {
1115- if (!KeepRowsOrder ()) {
1116- return true ;
1113+ void FlushRowsIfNeeded (const ui64& seqNo) {
1114+ if (KeepRowsOrder ()) {
1115+ YQL_ENSURE (seqNo >= CurrentResultSeqNo);
1116+ if (seqNo != CurrentResultSeqNo) {
1117+ return ;
1118+ }
11171119 }
11181120
1119- // we should check row seqNo only if we need to keep the order
1120- return seqNo >= CurrentResultSeqNo;
1121+ auto it = ResultRowsBySeqNo.find (seqNo);
1122+ YQL_ENSURE (it != ResultRowsBySeqNo.end ());
1123+ auto & result = it->second ;
1124+ result.FlushRows (FlushedResultRows);
1125+ if (result.Completed ()) {
1126+ ResultRowsBySeqNo.erase (it);
1127+ ++CurrentResultSeqNo;
1128+ }
11211129 }
11221130
11231131 void FillReadRequest (ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
@@ -1210,6 +1218,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
12101218 std::unordered_map<ui64, TReadState> ReadStateByReadId;
12111219 absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
12121220 std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
1221+ std::deque<TResultBatch::TResultRow> FlushedResultRows;
12131222 ui64 InputRowSeqNo = 0 ;
12141223 ui64 InputRowSeqNoLast = 0 ;
12151224 ui64 JoinKeySeqNo = 0 ;
0 commit comments