Skip to content

Commit be9f866

Browse files
authored
stable-25-3: Support relaxed Snapshot isolation in Column Shards (#29461)
2 parents 0dcb1c2 + cf1a4c3 commit be9f866

File tree

24 files changed

+179
-53
lines changed

24 files changed

+179
-53
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,10 +1114,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
11141114
evWrite->SetLockId(LockTxId, LockNodeId);
11151115
evWrite->Record.SetLockMode(LockMode);
11161116

1117-
if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) {
1118-
YQL_ENSURE(MvccSnapshot);
1119-
}
1120-
11211117
if (MvccSnapshot) {
11221118
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
11231119
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
10581058

10591059
if (QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE
10601060
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO
1061+
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
10611062
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_SCAN
10621063
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_AST_SCAN
10631064
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT

ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
6060
}
6161

6262
Y_UNIT_TEST(TSimpleOlap) {
63-
return;
6463
TSimple tester;
6564
tester.SetIsOlap(true);
6665
tester.Execute();
@@ -175,7 +174,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
175174
}
176175

177176
Y_UNIT_TEST(TConflictReadWriteOlap) {
178-
return;
179177
TConflictReadWrite tester;
180178
tester.SetIsOlap(true);
181179
tester.Execute();
@@ -233,11 +231,68 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
233231
}
234232

235233
Y_UNIT_TEST(TReadOnlyOlap) {
236-
return;
237234
TReadOnly tester;
238235
tester.SetIsOlap(true);
239236
tester.Execute();
240237
}
238+
239+
class TReadOwnChanges : public TTableDataModificationTester {
240+
protected:
241+
void DoExecute() override {
242+
auto client = Kikimr->GetQueryClient();
243+
auto session1 = client.GetSession().GetValueSync().GetSession();
244+
245+
// tx1 reads KV2
246+
auto result = session1.ExecuteQuery(Q_(R"(
247+
SELECT * FROM `/Root/KV2`;
248+
)"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
249+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
250+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
251+
auto tx1 = result.GetTransaction();
252+
UNIT_ASSERT(tx1);
253+
254+
// tx1 upserts a row
255+
result = session1.ExecuteQuery(Q_(R"(
256+
UPSERT INTO `/Root/KV2` (Key, Value)
257+
VALUES (1U, "val1");
258+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
259+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
260+
tx1 = result.GetTransaction();
261+
262+
// tx1 reads KV2 and sees the row
263+
result = session1.ExecuteQuery(Q_(R"(
264+
SELECT * FROM `/Root/KV2`;
265+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
266+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
267+
CompareYson(R"([[1u;["val1"]]])", FormatResultSetYson(result.GetResultSet(0)));
268+
tx1 = result.GetTransaction();
269+
270+
// tx1 commits
271+
result = tx1->Commit().ExtractValueSync();
272+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
273+
}
274+
};
275+
276+
Y_UNIT_TEST(TReadOwnChangesOltp) {
277+
return;
278+
TReadOwnChanges tester;
279+
tester.SetIsOlap(false);
280+
tester.Execute();
281+
}
282+
283+
Y_UNIT_TEST(TReadOwnChangesOltpNoSink) {
284+
return;
285+
TReadOwnChanges tester;
286+
tester.SetIsOlap(false);
287+
tester.SetDisableSinks(true);
288+
tester.Execute();
289+
}
290+
291+
Y_UNIT_TEST(TReadOwnChangesOlap) {
292+
TReadOwnChanges tester;
293+
tester.SetIsOlap(true);
294+
tester.Execute();
295+
}
241296
}
242297

243298
} // namespace NKqp

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class IColumnEngine {
145145
}
146146
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
147147
virtual std::vector<std::shared_ptr<TPortionInfo>> Select(
148-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const = 0;
148+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const = 0;
149149
virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
150150
virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,
151151
const std::optional<ui64> waitingPriority) const noexcept = 0;

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -453,36 +453,53 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up
453453
}
454454

455455
std::vector<std::shared_ptr<TPortionInfo>> TColumnEngineForLogs::Select(
456-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const {
456+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const {
457457
std::vector<std::shared_ptr<TPortionInfo>> out;
458458
auto spg = GranulesStorage->GetGranuleOptional(pathId);
459459
if (!spg) {
460460
return out;
461461
}
462462

463-
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
464-
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
463+
for (const auto& [writeId, portion] : spg->GetInsertedPortions()) {
464+
if (portion->IsRemovedFor(snapshot)) {
465465
continue;
466466
}
467-
const bool skipPortion = !pkRangesFilter.IsUsed(*portionInfo);
468-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
469-
"portion", portionInfo->DebugString());
470-
if (skipPortion) {
467+
// nonconflicting: visible in the snapshot or own portions
468+
auto nonconflicting = portion->IsVisible(snapshot, true) || (ownPortions.has_value() && ownPortions->contains(writeId));
469+
auto conflicting = !nonconflicting;
470+
471+
if (nonconflicting && !withNonconflicting || conflicting && !withConflicting) {
471472
continue;
472473
}
473-
out.emplace_back(portionInfo);
474+
475+
const bool takePortion = pkRangesFilter.IsUsed(*portion);
476+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
477+
if (takePortion) {
478+
out.emplace_back(portion);
479+
}
474480
}
475-
for (const auto& [_, portionInfo] : spg->GetPortions()) {
476-
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
481+
for (const auto& [_, portion] : spg->GetPortions()) {
482+
if (portion->IsRemovedFor(snapshot)) {
483+
continue;
484+
}
485+
486+
auto nonconflicting = portion->IsVisible(snapshot, true);
487+
auto conflicting = !nonconflicting;
488+
489+
// take compacted portions only if all the records are visible in the snapshot
490+
if (conflicting && portion->GetPortionType() == EPortionType::Compacted) {
477491
continue;
478492
}
479-
const bool skipPortion = !pkRangesFilter.IsUsed(*portionInfo);
480-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
481-
"portion", portionInfo->DebugString());
482-
if (skipPortion) {
493+
494+
if (nonconflicting && !withNonconflicting || conflicting && !withConflicting) {
483495
continue;
484496
}
485-
out.emplace_back(portionInfo);
497+
498+
const bool takePortion = pkRangesFilter.IsUsed(*portion);
499+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
500+
if (takePortion) {
501+
out.emplace_back(portion);
502+
}
486503
}
487504

488505
return out;

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class TColumnEngineForLogs: public IColumnEngine {
170170
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
171171

172172
std::vector<std::shared_ptr<TPortionInfo>> Select(
173-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
173+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& withUncommittedOnlyForTheseWrites) const override;
174174

175175
bool IsPortionExists(const TInternalPathId pathId, const ui64 portionId) const {
176176
return !!GranulesStorage->GetPortionOptional(pathId, portionId);

ydb/core/tx/columnshard/engines/metadata_accessor.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,18 @@ TUserTableAccessor::TUserTableAccessor(const TString& tableName, const NColumnSh
3131
}
3232

3333
std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::SelectMetadata(const TSelectMetadataContext& context,
34-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const {
34+
const NReader::TReadDescription& readDescription, const bool isPlain) const {
3535
AFL_VERIFY(readDescription.PKRangesFilter);
36+
// here we select portions for a read
3637
std::vector<std::shared_ptr<TPortionInfo>> portions =
37-
context.GetEngine().Select(PathId.InternalPathId, readDescription.GetSnapshot(), *readDescription.PKRangesFilter, withUncommitted);
38+
context.GetEngine().Select(
39+
PathId.InternalPathId,
40+
readDescription.GetSnapshot(),
41+
*readDescription.PKRangesFilter,
42+
readDescription.readNonconflictingPortions,
43+
readDescription.readConflictingPortions,
44+
readDescription.ownPortions
45+
);
3846
if (!isPlain) {
3947
std::deque<NReader::NSimple::TSourceConstructor> sources;
4048
for (auto&& i : portions) {
@@ -47,7 +55,7 @@ std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::Selec
4755
}
4856

4957
std::unique_ptr<NReader::NCommon::ISourcesConstructor> TAbsentTableAccessor::SelectMetadata(const TSelectMetadataContext& /*context*/,
50-
const NReader::TReadDescription& /*readDescription*/, const bool /*withUncommitted*/, const bool /*isPlain*/) const {
58+
const NReader::TReadDescription& /*readDescription*/, const bool /*isPlain*/) const {
5159
return NReader::NSimple::TPortionsSources::BuildEmpty();
5260
}
5361

ydb/core/tx/columnshard/engines/metadata_accessor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class ITableMetadataAccessor {
8080
};
8181

8282
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
83-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const = 0;
83+
const NReader::TReadDescription& readDescription, const bool isPlain) const = 0;
8484
virtual std::optional<TGranuleShardingInfo> GetShardingInfo(
8585
const std::shared_ptr<const TVersionedIndex>& indexVersionsPointer, const NOlap::TSnapshot& ss) const = 0;
8686
};
@@ -106,7 +106,7 @@ class TUserTableAccessor: public ITableMetadataAccessor {
106106
}
107107

108108
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
109-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const override;
109+
const NReader::TReadDescription& readDescription, const bool isPlain) const override;
110110
virtual std::optional<TGranuleShardingInfo> GetShardingInfo(
111111
const std::shared_ptr<const TVersionedIndex>& indexVersionsPointer, const NOlap::TSnapshot& ss) const override {
112112
return indexVersionsPointer->GetShardingInfoOptional(PathId.GetInternalPathId(), ss);
@@ -142,7 +142,7 @@ class TAbsentTableAccessor: public ITableMetadataAccessor {
142142
return std::nullopt;
143143
}
144144
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
145-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const override;
145+
const NReader::TReadDescription& readDescription, const bool isPlain) const override;
146146
};
147147

148-
} // namespace NKikimr::NOlap
148+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TReadMetadataBase {
4646
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
4747
ui64 TxId = 0;
4848
std::optional<ui64> LockId;
49+
std::optional<NKikimrDataEvents::ELockMode> LockMode;
4950
EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
5051

5152
public:
@@ -55,6 +56,11 @@ class TReadMetadataBase {
5556
return TabletId;
5657
}
5758

59+
bool NeedToDetectConflicts() const {
60+
// do not detect conflicts for snapshot isolated transactions or txs with no lock
61+
return LockId.has_value() && LockMode.value_or(NKikimrDataEvents::OPTIMISTIC) != NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
62+
}
63+
5864
void SetRequestedLimit(const ui64 value) {
5965
AFL_VERIFY(!RequestedLimit);
6066
if (value == 0 || value >= Max<i64>()) {

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/tx/columnshard/common/snapshot.h>
44
#include <ydb/core/tx/columnshard/engines/metadata_accessor.h>
55
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
6+
#include <ydb/core/tx/columnshard/operations/manager.h>
67
#include <ydb/core/tx/program/program.h>
78

89
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
@@ -33,10 +34,15 @@ class TReadDescription {
3334
// Table
3435
ui64 TxId = 0;
3536
std::optional<ui64> LockId;
37+
std::optional<NKikimrDataEvents::ELockMode> LockMode;
3638
std::shared_ptr<ITableMetadataAccessor> TableMetadataAccessor;
3739
std::shared_ptr<NOlap::TPKRangesFilter> PKRangesFilter;
3840
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
3941
EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
42+
bool readNonconflictingPortions;
43+
bool readConflictingPortions;
44+
// portions that the current tx has written
45+
std::optional<THashSet<TInsertWriteId>> ownPortions;
4046

4147
bool IsReverseSort() const {
4248
return Sorting == ERequestSorting::DESC;
@@ -55,6 +61,34 @@ class TReadDescription {
5561
ScanCursor = cursor;
5662
}
5763

64+
void SetLock(std::optional<ui64> lockId, std::optional<NKikimrDataEvents::ELockMode> lockMode, const NColumnShard::TLockFeatures* lock) {
65+
LockId = lockId;
66+
LockMode = lockMode;
67+
68+
auto snapshotIsolation = lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
69+
70+
// always true for now, will be false for reads that check only conflicts (comming soon with Snapshot Isolation)
71+
readNonconflictingPortions = true;
72+
73+
// do not check conflicts for Snapshot isolated txs or txs with no lock
74+
readConflictingPortions = LockId.has_value() && !snapshotIsolation;
75+
76+
if (readNonconflictingPortions && !readConflictingPortions && lock != nullptr && lock->GetWriteOperations().size() > 0) {
77+
ownPortions = THashSet<TInsertWriteId>();
78+
for (auto& writeOperation : lock->GetWriteOperations()) {
79+
for (auto insertWriteId : writeOperation->GetInsertWriteIds()) {
80+
ownPortions->emplace(insertWriteId);
81+
}
82+
}
83+
}
84+
85+
// we want to read something, don't we?
86+
AFL_VERIFY(readNonconflictingPortions || readConflictingPortions);
87+
if (ownPortions.has_value() && !ownPortions->empty()) {
88+
AFL_VERIFY(readNonconflictingPortions);
89+
}
90+
}
91+
5892
TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting)
5993
: Snapshot(snapshot)
6094
, Sorting(sorting)

0 commit comments

Comments
 (0)