Skip to content

Commit dad0462

Browse files
authored
Stats for CTAS fix (#29269)
1 parent c2f81e9 commit dad0462

File tree

2 files changed

+140
-8
lines changed

2 files changed

+140
-8
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
714714
return builder;
715715
}()
716716
<< ", Cookie=" << ev->Cookie);
717-
UpdateStats(ev->Get()->Record.GetTxStats());
718717

719718
TxManager->AddParticipantNode(ev->Sender.NodeId());
720719

@@ -743,6 +742,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
743742
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
744743
<< " Sink=" << this->SelfId() << "."
745744
<< getIssues().ToOneLineString());
745+
UpdateStats(ev->Get()->Record.GetTxStats());
746746
TxManager->SetError(ev->Get()->Record.GetOrigin());
747747
RuntimeError(
748748
NYql::NDqProto::StatusIds::UNSPECIFIED,
@@ -766,6 +766,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
766766
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
767767
<< " Sink=" << this->SelfId() << "."
768768
<< getIssues().ToOneLineString());
769+
UpdateStats(ev->Get()->Record.GetTxStats());
769770
TxManager->SetError(ev->Get()->Record.GetOrigin());
770771
RuntimeError(
771772
NYql::NDqProto::StatusIds::ABORTED,
@@ -785,6 +786,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
785786
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
786787
RetryResolve();
787788
} else {
789+
UpdateStats(ev->Get()->Record.GetTxStats());
788790
TxManager->SetError(ev->Get()->Record.GetOrigin());
789791
RuntimeError(
790792
NYql::NDqProto::StatusIds::UNAVAILABLE,
@@ -800,6 +802,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
800802
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
801803
<< " Sink=" << this->SelfId() << "."
802804
<< getIssues().ToOneLineString());
805+
UpdateStats(ev->Get()->Record.GetTxStats());
803806
TxManager->SetError(ev->Get()->Record.GetOrigin());
804807
RuntimeError(
805808
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
@@ -814,6 +817,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
814817
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
815818
<< " Sink=" << this->SelfId() << "."
816819
<< getIssues().ToOneLineString());
820+
UpdateStats(ev->Get()->Record.GetTxStats());
817821
TxManager->SetError(ev->Get()->Record.GetOrigin());
818822
RuntimeError(
819823
NYql::NDqProto::StatusIds::UNAVAILABLE,
@@ -832,6 +836,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
832836
<< getIssues().ToOneLineString());
833837
// TODO: support waiting
834838
if (!InconsistentTx) {
839+
UpdateStats(ev->Get()->Record.GetTxStats());
835840
TxManager->SetError(ev->Get()->Record.GetOrigin());
836841
RuntimeError(
837842
NYql::NDqProto::StatusIds::OVERLOADED,
@@ -851,6 +856,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
851856
<< getIssues().ToOneLineString());
852857
// TODO: support waiting
853858
if (!InconsistentTx) {
859+
UpdateStats(ev->Get()->Record.GetTxStats());
854860
TxManager->SetError(ev->Get()->Record.GetOrigin());
855861
RuntimeError(
856862
NYql::NDqProto::StatusIds::OVERLOADED,
@@ -868,6 +874,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
868874
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
869875
<< " Sink=" << this->SelfId() << "."
870876
<< getIssues().ToOneLineString());
877+
UpdateStats(ev->Get()->Record.GetTxStats());
871878
TxManager->SetError(ev->Get()->Record.GetOrigin());
872879
RuntimeError(
873880
NYql::NDqProto::StatusIds::CANCELLED,
@@ -882,6 +889,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
882889
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
883890
<< " Sink=" << this->SelfId() << "."
884891
<< getIssues().ToOneLineString());
892+
UpdateStats(ev->Get()->Record.GetTxStats());
885893
TxManager->SetError(ev->Get()->Record.GetOrigin());
886894
RuntimeError(
887895
NYql::NDqProto::StatusIds::BAD_REQUEST,
@@ -901,6 +909,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
901909
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
902910
RetryResolve();
903911
} else {
912+
UpdateStats(ev->Get()->Record.GetTxStats());
904913
TxManager->SetError(ev->Get()->Record.GetOrigin());
905914
RuntimeError(
906915
NYql::NDqProto::StatusIds::SCHEME_ERROR,
@@ -918,6 +927,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
918927
<< " Sink=" << this->SelfId() << "."
919928
<< getIssues().ToOneLineString());
920929

930+
UpdateStats(ev->Get()->Record.GetTxStats());
921931
TxManager->BreakLock(ev->Get()->Record.GetOrigin());
922932
YQL_ENSURE(TxManager->BrokenLocks());
923933
TxManager->SetError(ev->Get()->Record.GetOrigin());
@@ -934,6 +944,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
934944
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
935945
<< " Sink=" << this->SelfId() << "."
936946
<< getIssues().ToOneLineString());
947+
UpdateStats(ev->Get()->Record.GetTxStats());
937948
TxManager->SetError(ev->Get()->Record.GetOrigin());
938949
RuntimeError(
939950
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
@@ -951,6 +962,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
951962
const auto& record = ev->Get()->Record;
952963
AFL_ENSURE(record.GetTxLocks().empty());
953964

965+
UpdateStats(record.GetTxStats());
966+
954967
IKqpTransactionManager::TPrepareResult preparedInfo;
955968
preparedInfo.ShardId = record.GetOrigin();
956969
preparedInfo.MinStep = record.GetMinStep();
@@ -988,6 +1001,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
9881001
if (Mode == EMode::WRITE) {
9891002
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
9901003
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
1004+
UpdateStats(ev->Get()->Record.GetTxStats());
9911005
YQL_ENSURE(TxManager->BrokenLocks());
9921006
NYql::TIssues issues;
9931007
issues.AddIssue(*TxManager->GetLockIssue());
@@ -1000,6 +1014,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
10001014
}
10011015

10021016
if (Mode == EMode::COMMIT) {
1017+
UpdateStats(ev->Get()->Record.GetTxStats());
10031018
Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0);
10041019
return;
10051020
}
@@ -1008,9 +1023,11 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
10081023
const auto result = ShardedWriteController->OnMessageAcknowledged(
10091024
ev->Get()->Record.GetOrigin(), ev->Cookie);
10101025
if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) {
1026+
UpdateStats(ev->Get()->Record.GetTxStats());
10111027
Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize);
10121028
} else if (result) {
10131029
AFL_ENSURE(Mode == EMode::WRITE);
1030+
UpdateStats(ev->Get()->Record.GetTxStats());
10141031
Callbacks->OnMessageAcknowledged(result->DataSize);
10151032
}
10161033
}

ydb/core/kqp/ut/cost/kqp_cost_ut.cpp

Lines changed: 122 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) {
11221122
}
11231123

11241124

1125-
Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) {
1125+
Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) {
1126+
if (isOlap) {
1127+
// TODO: same stats for olap?
1128+
return;
1129+
}
11261130
TKikimrRunner kikimr(GetAppConfig(false, false, isSink));
11271131
auto db = kikimr.GetQueryClient();
11281132
auto session = db.GetSession().GetValueSync().GetSession();
11291133

1130-
CreateTestTable(session, false);
1134+
CreateTestTable(session, isOlap);
11311135

11321136
{
11331137
auto query = Q_(R"(
@@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
11431147
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
11441148

11451149
Cerr << stats.DebugString() << Endl;
1146-
size_t phase = stats.query_phases_size() - 1;
1150+
size_t phase = isOlap ? 0 : stats.query_phases_size() - 1;
11471151
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1);
11481152
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20);
11491153

@@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
11761180
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
11771181

11781182
Cerr << stats.DebugString() << Endl;
1179-
size_t phase = stats.query_phases_size() - 1;
1183+
size_t phase = isOlap ? 0 : stats.query_phases_size() - 1;
11801184
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1);
11811185
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20);
11821186

@@ -1388,13 +1392,17 @@ Y_UNIT_TEST_SUITE(KqpCost) {
13881392
}
13891393
}
13901394

1391-
Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) {
1395+
Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) {
1396+
if (isOlap) {
1397+
// TODO: same stats for olap?
1398+
return;
1399+
}
13921400
TKikimrRunner kikimr(GetAppConfig(false, false, isSink));
13931401
auto db = kikimr.GetQueryClient();
13941402
auto session = db.GetSession().GetValueSync().GetSession();
13951403

1396-
CreateTestTable(session, false);
1397-
CreateTestTable(session, false, "2");
1404+
CreateTestTable(session, isOlap);
1405+
CreateTestTable(session, isOlap, "2");
13981406

13991407
{
14001408
// Three inserts
@@ -1745,6 +1753,113 @@ Y_UNIT_TEST_SUITE(KqpCost) {
17451753
}
17461754
}
17471755

1756+
Y_UNIT_TEST_TWIN(CTAS, isOlap) {
1757+
TKikimrRunner kikimr(GetAppConfig(false, false, true));
1758+
auto db = kikimr.GetQueryClient();
1759+
auto session = db.GetSession().GetValueSync().GetSession();
1760+
1761+
CreateTestTable(session, isOlap);
1762+
1763+
{
1764+
auto query = std::format(R"(
1765+
CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`;
1766+
)", isOlap ? "COLUMN" : "ROW");
1767+
1768+
auto txControl = NYdb::NQuery::TTxControl::NoTx();
1769+
1770+
auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync();
1771+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1772+
1773+
1774+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
1775+
1776+
Cerr << stats.DebugString() << Endl;
1777+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1);
1778+
size_t phase = 0;
1779+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4);
1780+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80);
1781+
1782+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4);
1783+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80);
1784+
1785+
Check(
1786+
FromProto(stats),
1787+
TTotalStats{
1788+
.Writes = 4,
1789+
.Reads = 4,
1790+
.Deletes = 0,
1791+
});
1792+
}
1793+
}
1794+
1795+
Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) {
1796+
auto appConfig = GetAppConfig(false, false, true);
1797+
appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40);
1798+
// For executing REPLACE
1799+
appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true);
1800+
TKikimrSettings settings(appConfig);
1801+
settings.SetUseRealThreads(false);
1802+
TKikimrRunner kikimr(settings);
1803+
auto db = kikimr.GetQueryClient();
1804+
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });
1805+
1806+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
1807+
1808+
kikimr.RunCall([&] {
1809+
CreateTestTable(session, isOlap);
1810+
});
1811+
1812+
size_t messages = 0;
1813+
1814+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
1815+
if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) {
1816+
++messages;
1817+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWriteResult>();
1818+
for (size_t index = 0; index < 3; ++index) {
1819+
// Send several duplicates
1820+
auto copy = std::make_unique<NEvents::TDataEvents::TEvWriteResult>();
1821+
copy->Record = msg->Record;
1822+
runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie));
1823+
}
1824+
}
1825+
return TTestActorRuntime::EEventAction::PROCESS;
1826+
};
1827+
runtime.SetObserverFunc(grab);
1828+
1829+
{
1830+
auto query = std::format(R"(
1831+
CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`;
1832+
)", isOlap ? "COLUMN" : "ROW");
1833+
1834+
auto txControl = NYdb::NQuery::TTxControl::NoTx();
1835+
1836+
auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); });
1837+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1838+
1839+
1840+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
1841+
1842+
Cerr << stats.DebugString() << Endl;
1843+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1);
1844+
size_t phase = 0;
1845+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4);
1846+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80);
1847+
1848+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4);
1849+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80);
1850+
1851+
Check(
1852+
FromProto(stats),
1853+
TTotalStats{
1854+
.Writes = 4,
1855+
.Reads = 4,
1856+
.Deletes = 0,
1857+
});
1858+
}
1859+
1860+
UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1);
1861+
}
1862+
17481863
}
17491864

17501865
}

0 commit comments

Comments
 (0)