Skip to content

Commit 45eda5a

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Long tests were placed in a separate suite (#27194)
1 parent 3cb78b9 commit 45eda5a

File tree

5 files changed

+3128
-3106
lines changed

5 files changed

+3128
-3106
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
265bf81d86234be2a1f2d2834ef72e36b4f43e73
1+
79c359a5ca4220eac67b66bfb80b0d1a9ea8837c
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#include <src/client/topic/ut/ut_utils/txusage_fixture.h>
2+
#include <library/cpp/testing/unittest/registar.h>
3+
4+
namespace NYdb::inline V3::NTopic::NTests::NTxUsage {
5+
6+
Y_UNIT_TEST_SUITE(TxUsage) {
7+
8+
Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo_Table, TFixtureTable)
9+
{
10+
TestTransactionsConflictOnSeqNo();
11+
}
12+
13+
Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo_Query, TFixtureQuery)
14+
{
15+
TestTransactionsConflictOnSeqNo();
16+
}
17+
18+
Y_UNIT_TEST_F(WriteToTopic_Demo_44_Table, TFixtureTable)
19+
{
20+
TestWriteToTopic44();
21+
}
22+
23+
Y_UNIT_TEST_F(WriteToTopic_Demo_44_Query, TFixtureQuery)
24+
{
25+
TestWriteToTopic44();
26+
}
27+
28+
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions_Table, TFixtureTable)
29+
{
30+
TestWriteRandomSizedMessagesInWideTransactions();
31+
}
32+
33+
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions_Query, TFixtureQuery)
34+
{
35+
TestWriteRandomSizedMessagesInWideTransactions();
36+
}
37+
38+
Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions_Table, TFixtureTable)
39+
{
40+
TestWriteOnlyBigMessagesInWideTransactions();
41+
}
42+
43+
Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions_Query, TFixtureQuery)
44+
{
45+
TestWriteOnlyBigMessagesInWideTransactions();
46+
}
47+
48+
Y_UNIT_TEST_F(Write_And_Read_Big_Messages_1, TFixtureNoClient)
49+
{
50+
TestWriteAndReadMessages(27, 64'000 * 12, false);
51+
}
52+
53+
Y_UNIT_TEST_F(Write_And_Read_Big_Messages_2, TFixtureNoClient)
54+
{
55+
TestWriteAndReadMessages(27, 64'000 * 12, true);
56+
}
57+
58+
Y_UNIT_TEST_F(Write_And_Read_Huge_Messages_1, TFixtureNoClient)
59+
{
60+
TestWriteAndReadMessages(4, 9'000'000, false);
61+
}
62+
63+
Y_UNIT_TEST_F(Write_And_Read_Huge_Messages_2, TFixtureNoClient)
64+
{
65+
TestWriteAndReadMessages(4, 9'000'000, true);
66+
}
67+
68+
Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_1, TFixtureNoClient)
69+
{
70+
TestWriteAndReadMessages(4, 61'000'000, false);
71+
}
72+
73+
Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_2, TFixtureNoClient)
74+
{
75+
TestWriteAndReadMessages(4, 61'000'000, true);
76+
}
77+
78+
Y_UNIT_TEST_F(Write_50k_100times_50tx, TFixtureTable)
79+
{
80+
// 100 transactions. Write 100 50KB messages in each folder. Call the commit at the same time.
81+
// As a result, there will be a lot of small blobs in the FastWrite zone of the main batch,
82+
// which will be picked up by a compact. The scenario is similar to the work of Ya.Metrika.
83+
84+
const std::size_t PARTITIONS_COUNT = 2;
85+
const std::size_t TXS_COUNT = 50;
86+
87+
auto makeSourceId = [](unsigned txId, unsigned partitionId) {
88+
std::string sourceId = TEST_MESSAGE_GROUP_ID;
89+
sourceId += "_";
90+
sourceId += ToString(txId);
91+
sourceId += "_";
92+
sourceId += ToString(partitionId);
93+
return sourceId;
94+
};
95+
96+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
97+
98+
SetPartitionWriteSpeed("topic_A", 50'000'000);
99+
100+
std::vector<std::unique_ptr<TFixture::ISession>> sessions;
101+
std::vector<std::unique_ptr<TTransactionBase>> transactions;
102+
103+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
104+
sessions.push_back(CreateSession());
105+
auto& session = sessions.back();
106+
107+
transactions.push_back(session->BeginTx());
108+
auto& tx = transactions.back();
109+
110+
auto sourceId = makeSourceId(i, 0);
111+
for (size_t j = 0; j < 100; ++j) {
112+
WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 0);
113+
}
114+
WaitForAcks("topic_A", sourceId);
115+
116+
sourceId = makeSourceId(i, 1);
117+
WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 1);
118+
WaitForAcks("topic_A", sourceId);
119+
}
120+
121+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
122+
std::vector<TAsyncStatus> futures;
123+
124+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
125+
futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i]));
126+
}
127+
128+
// All transactions must be completed successfully.
129+
for (std::size_t i = 0; i < TXS_COUNT; ++i) {
130+
futures[i].Wait();
131+
const auto& result = futures[i].GetValueSync();
132+
UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
133+
}
134+
}
135+
136+
}
137+
138+
}

0 commit comments

Comments
 (0)