Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 81 additions & 58 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,17 +572,23 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
row.ComputeSize = NYql::NDq::TDqDataSerializer::EstimateSize(row.Data, GetLeftRowType());
ui64 joinKeyId = JoinKeySeqNo++;
TOwnedCellVec cellVec(std::move(joinKeyCells));
ResultRowsBySeqNo[rowSeqNo].AcceptLeftRow(std::move(row), rowSeqNo, firstRow, lastRow);
auto [resIt, _] = ResultRowsBySeqNo.emplace(
rowSeqNo, TResultBatch(KeepRowsOrder() ? nullptr : &FlushedResultRows, HolderFactory, rowSeqNo, std::move(row)));
resIt->second.AcceptLeftRow(firstRow, lastRow);
if (!IsKeyAllowed(cellVec)) {
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(joinKeyId);
ResultRowsBySeqNo[rowSeqNo].OnJoinKeyFinished(HolderFactory, joinKeyId);
resIt->second.AddJoinKey(joinKeyId);
resIt->second.OnJoinKeyFinished(joinKeyId);
if (resIt->second.Completed()) {
ResultRowsBySeqNo.erase(resIt);
}

} else {
auto [it, success] = PendingLeftRowsByKey.emplace(cellVec, TJoinKeyInfo(joinKeyId));
if (success) {
UnprocessedRows.emplace_back(TUnprocessedLeftRow(cellVec));
}
it->second.ResultSeqNos.push_back(rowSeqNo);
ResultRowsBySeqNo[rowSeqNo].AddJoinKey(it->second.JoinKeyId);
resIt->second.AddJoinKey(it->second.JoinKeyId);
}
}

Expand Down Expand Up @@ -764,7 +770,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto it = PendingLeftRowsByKey.find(unprocessedRow.JoinKey);
YQL_ENSURE(it != PendingLeftRowsByKey.end());
for(ui64 seqNo : it->second.ResultSeqNos) {
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, it->second.JoinKeyId);
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
resultRowsIt->second.OnJoinKeyFinished(it->second.JoinKeyId);
if (resultRowsIt->second.Completed()) {
ResultRowsBySeqNo.erase(resultRowsIt);
}
}
PendingLeftRowsByKey.erase(it);
}
Expand Down Expand Up @@ -813,13 +824,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
}

bool HasPendingResults() final {
auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin();

if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) {
return true;
}
OrderedFlushRows();

return false;
return !FlushedResultRows.empty();
}

void AddResult(TShardReadResult result) final {
Expand All @@ -846,8 +853,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto rightRow = leftRowIt->second.AttachRightRow(HolderFactory, Settings, ReadColumns, row);
for(auto seqNo: leftRowIt->second.ResultSeqNos) {
auto& resultRows = ResultRowsBySeqNo.at(seqNo);
resultRows.TryBuildResultRow(HolderFactory, rightRow);
YQL_ENSURE(IsRowSeqNoValid(seqNo));
resultRows.TryBuildResultRow(rightRow);
}
}

Expand All @@ -861,7 +867,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
if (leftRowIt->second.FinishReadId(record.GetReadId())) {
for(ui64 seqNo : leftRowIt->second.ResultSeqNos) {
ResultRowsBySeqNo.at(seqNo).OnJoinKeyFinished(HolderFactory, leftRowIt->second.JoinKeyId);
auto resultRowsIt = ResultRowsBySeqNo.find(seqNo);
YQL_ENSURE(resultRowsIt != ResultRowsBySeqNo.end());
resultRowsIt->second.OnJoinKeyFinished(leftRowIt->second.JoinKeyId);
if (resultRowsIt->second.Completed()) {
ResultRowsBySeqNo.erase(resultRowsIt);
}
}

PendingLeftRowsByKey.erase(leftRowIt);
Expand All @@ -878,7 +889,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
&& UnprocessedKeys.empty()
&& ReadStateByReadId.empty()
&& ResultRowsBySeqNo.empty()
&& PendingLeftRowsByKey.empty();
&& PendingLeftRowsByKey.empty()
&& FlushedResultRows.empty();
}

void ResetRowsProcessing(ui64 readId) final {
Expand Down Expand Up @@ -919,45 +931,46 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
ReadStateByReadId.erase(readIt);
}

TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
batch.clear();
void OrderedFlushRows() {
if (!KeepRowsOrder()) {
return;
}

auto getNextResult = [&]() {
if (!KeepRowsOrder()) {
return ResultRowsBySeqNo.begin();
while(true) {
auto it = ResultRowsBySeqNo.find(CurrentResultSeqNo);
if (it == ResultRowsBySeqNo.end()) {
break;
}

return ResultRowsBySeqNo.find(CurrentResultSeqNo);
};

while (!resultStats.SizeLimitExceeded) {
auto resultIt = getNextResult();
if (resultIt == ResultRowsBySeqNo.end()) {
it->second.FlushRows(FlushedResultRows);
if (!it->second.Completed()) {
break;
}

auto& result = resultIt->second;
++CurrentResultSeqNo;
ResultRowsBySeqNo.erase(it);
}

while(!result.Rows.empty()) {
TResultBatch::TResultRow& row = result.Rows.front();
return;
}

if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
resultStats.SizeLimitExceeded = true;
break;
}
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
batch.clear();

batch.emplace_back(std::move(row.Data));
result.Rows.pop_front();
resultStats.Add(row.Stats);
}
OrderedFlushRows();

if (result.Completed()) {
ResultRowsBySeqNo.erase(resultIt);
++CurrentResultSeqNo;
} else {
while (!resultStats.SizeLimitExceeded && !FlushedResultRows.empty()) {
TResultBatch::TResultRow& row = FlushedResultRows.front();

if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
resultStats.SizeLimitExceeded = true;
break;
}

batch.emplace_back(std::move(row.Data));
FlushedResultRows.pop_front();
resultStats.Add(row.Stats);
}

return resultStats;
Expand All @@ -971,6 +984,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
UnprocessedRows.clear();
PendingLeftRowsByKey.clear();
ResultRowsBySeqNo.clear();
FlushedResultRows.clear();
}
private:
struct TJoinKeyInfo {
Expand Down Expand Up @@ -1033,6 +1047,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
TReadResultStats Stats;
};

explicit TResultBatch(std::deque<TResultRow>* result, const NMiniKQL::THolderFactory& holderFactory, ui64 seqNo, TSizedUnboxedValue&& leftRow)
: Result(result)
, HolderFactory(holderFactory)
, RowSeqNo(seqNo)
, LeftRow(std::move(leftRow))
{
}

std::deque<TResultRow>* Result;
const NMiniKQL::THolderFactory& HolderFactory;
std::deque<TResultRow> Rows;
std::unordered_set<ui64> PendingJoinKeys;
ui32 FirstUnprocessedRow = 0;
Expand All @@ -1048,21 +1072,19 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
return LastRow && PendingJoinKeys.empty();
}

void OnJoinKeyFinished(const NMiniKQL::THolderFactory& HolderFactory, ui64 joinKeyId) {
void OnJoinKeyFinished(ui64 joinKeyId) {
auto it = PendingJoinKeys.find(joinKeyId);
if (it != PendingJoinKeys.end()) {
PendingJoinKeys.erase(joinKeyId);
TryBuildResultRow(HolderFactory);
TryBuildResultRow();
}
}

void AddJoinKey(ui64 joinKeyId) {
PendingJoinKeys.emplace(joinKeyId);
}

void AcceptLeftRow(TSizedUnboxedValue&& leftRow, ui64 seqNo, bool firstRow, bool lastRow) {
RowSeqNo = seqNo;
LeftRow = std::move(leftRow);
void AcceptLeftRow(bool firstRow, bool lastRow) {
if (firstRow)
FirstRow = true;

Expand All @@ -1074,7 +1096,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
return Rows.empty() && FirstRow && LastRow && PendingJoinKeys.empty();
}

void TryBuildResultRow(const NMiniKQL::THolderFactory& HolderFactory, std::optional<TSizedUnboxedValue> row = {}) {
void FlushRows(std::deque<TResultRow>& externalStorage) {
externalStorage.insert(externalStorage.end(), Rows.begin(), Rows.end());
Rows.clear();
}

void TryBuildResultRow(std::optional<TSizedUnboxedValue> row = {}) {
if (RightRow.Data.HasValue() || ProcessedAllJoinKeys()) {
TReadResultStats rowStats;
NUdf::TUnboxedValue resultRow;
Expand All @@ -1094,7 +1121,11 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
rowStats.ResultBytesCount += LeftRow.ComputeSize + RightRow.ComputeSize;

++ProcessedRows;
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
if (Result) {
Result->emplace_back(std::move(resultRow), std::move(rowStats));
} else {
Rows.emplace_back(std::move(resultRow), std::move(rowStats));
}
}

if (row.has_value()) {
Expand All @@ -1111,15 +1142,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
return Settings.KeepRowsOrder;
}

bool IsRowSeqNoValid(const ui64& seqNo) const {
if (!KeepRowsOrder()) {
return true;
}

// we should check row seqNo only if we need to keep the order
return seqNo >= CurrentResultSeqNo;
}

void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
auto& record = request->Record;

Expand Down Expand Up @@ -1210,6 +1232,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
std::unordered_map<ui64, TReadState> ReadStateByReadId;
absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
std::deque<TResultBatch::TResultRow> FlushedResultRows;
ui64 InputRowSeqNo = 0;
ui64 InputRowSeqNoLast = 0;
ui64 JoinKeySeqNo = 0;
Expand Down
Loading