Skip to content

Commit 8bf4163

Browse files
authored
Extract MlpWriter class (#29297)
1 parent 5230310 commit 8bf4163

25 files changed

+745
-381
lines changed

ydb/core/http_proxy/ut/sqs_topic_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ Y_UNIT_TEST_SUITE(TestSqsTopicHttpProxy) {
279279
}}
280280
});
281281

282+
Cerr << (TStringBuilder() << "json = " << WriteJson(json, true, true) << '\n');
282283
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 3);
283284
auto succesful0 = json["Successful"][0];
284285
UNIT_ASSERT_VALUES_EQUAL(succesful0["Id"], "Id-0");

ydb/core/persqueue/pqtablet/partition/mlp/mlp_common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ std::unique_ptr<TEvPQ::TEvSetClientInfo> MakeEvCommit(
6666
consumer.GetGeneration(),
6767
0, // step
6868
TActorId{} // pipeClient
69-
);
69+
);
7070
}
7171

7272
std::unique_ptr<TEvPersQueue::TEvHasDataInfo> MakeEvHasData(

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ bool TConsumerActor::FetchMessagesIfNeeded() {
669669

670670
void TConsumerActor::Handle(TEvPQ::TEvProxyResponse::TPtr& ev) {
671671
LOG_D("Handle TEvPQ::TEvProxyResponse");
672-
672+
673673
AFL_ENSURE(IsSucess(ev))("e", ev->Get()->Response->DebugString());
674674
}
675675

@@ -710,7 +710,7 @@ void TConsumerActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
710710
AFL_ENSURE(res)("o", result.GetOffset());
711711

712712
for (auto& attr : *proto.MutableMessageMeta()) {
713-
if (attr.key() == NMessageConsts::MessageId) {
713+
if (attr.key() == MESSAGE_KEY) {
714714
messageGroupId = std::move(*attr.mutable_value());
715715
} else if (attr.key() == NMessageConsts::DelaySeconds) {
716716
delaySeconds = std::stoul(attr.value());

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class TConsumerActor : public TBaseTabletActor<TConsumerActor>
7777

7878
void CommitIfNeeded();
7979
void UpdateStorageConfig();
80-
80+
8181
size_t RequiredToFetchMessageCount() const;
8282
void SendToPQTablet(std::unique_ptr<IEventBase> ev);
8383

ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ void TStorage::MoveBaseDeadline() {
725725
}
726726

727727
auto newBaseDeadline = TrimToSeconds(TimeProvider->Now(), false);
728-
auto newBaseWriteTimestamp = BaseWriteTimestamp +
728+
auto newBaseWriteTimestamp = BaseWriteTimestamp +
729729
(SlowMessages.empty() ? TDuration::Seconds(Messages.front().WriteTimestampDelta)
730730
: TDuration::Seconds(SlowMessages.begin()->second.WriteTimestampDelta));
731731

@@ -816,7 +816,7 @@ TString TStorage::DebugString() const {
816816
<< " BaseDeadline: " << BaseDeadline.ToString()
817817
<< " BaseWriteTimestamp: " << BaseWriteTimestamp.ToString()
818818
<< " Messages: [";
819-
819+
820820
auto dump = [&](const auto offset, const auto& message, auto zone) {
821821
sb << zone <<"{" << offset << ", "
822822
<< static_cast<EMessageStatus>(message.Status) << ", "

ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ struct TUtils {
7777

7878
utils.AssertEquals(*this);
7979
}
80-
}
80+
}
8181

8282
NKikimrPQ::TMLPStorageSnapshot CreateSnapshot() {
8383
// Clear batch
@@ -1410,7 +1410,7 @@ Y_UNIT_TEST(StorageSerialization_WAL_WithMoveBaseTime_Deadline) {
14101410
UNIT_ASSERT(message);
14111411
UNIT_ASSERT_VALUES_EQUAL(message->DeadlineDelta, 5);
14121412
}
1413-
1413+
14141414
timeProvider->Tick(TDuration::Seconds(3));
14151415
storage.MoveBaseDeadline();
14161416
{

ydb/core/persqueue/public/constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ namespace NKikimr::NPQ {
77
static const TString CLIENTID_COMPACTION_CONSUMER = "__ydb_compaction_consumer";
88
static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer";
99

10+
static constexpr TStringBuf MESSAGE_KEY = "__key";
11+
1012
static constexpr ui32 METRICS_LEVEL_DISABLED = 0;
1113
static constexpr ui32 METRICS_LEVEL_DATABASE = 1;
1214
static constexpr ui32 METRICS_LEVEL_OBJECT = 2;

ydb/core/persqueue/public/mlp/mlp.h

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/core/persqueue/events/events.h>
4+
#include <ydb/core/persqueue/public/describer/describer.h>
45
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
56
#include <ydb/public/api/protos/ydb_topic.pb.h>
67
#include <ydb/library/actors/core/actorsystem_fwd.h>
@@ -13,6 +14,7 @@ namespace NKikimr::NPQ::NMLP {
1314

1415
enum EEv : ui32 {
1516
EvReadResponse = InternalEventSpaceBegin(NPQ::NEvents::EServices::MLP),
17+
EvWriteResponse,
1618
EvChangeResponse,
1719
EvEnd
1820
};
@@ -22,6 +24,18 @@ struct TMessageId {
2224
ui64 Offset;
2325
};
2426

27+
struct TEvWriteResponse : public NActors::TEventLocal<TEvWriteResponse, EEv::EvWriteResponse> {
28+
29+
NDescriber::EStatus DescribeStatus;
30+
31+
struct TMessage {
32+
size_t Index;
33+
// if message was written successfully, it will be set
34+
std::optional<TMessageId> MessageId;
35+
};
36+
std::vector<TMessage> Messages;
37+
};
38+
2539
struct TEvReadResponse : public NActors::TEventLocal<TEvReadResponse, EEv::EvReadResponse> {
2640

2741
TEvReadResponse(Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, TString&& errorDescription = {})
@@ -41,7 +55,6 @@ struct TEvReadResponse : public NActors::TEventLocal<TEvReadResponse, EEv::EvRea
4155
TInstant SentTimestamp;
4256
TString MessageGroupId;
4357
};
44-
// The original topic path (from request) -> TopicInfo
4558
std::vector<TMessage> Messages;
4659
};
4760

@@ -61,11 +74,29 @@ struct TEvChangeResponse : public NActors::TEventLocal<TEvChangeResponse, EEv::E
6174
TMessageId MessageId;
6275
bool Success = false;
6376
};
64-
// The original topic path (from request) -> TopicInfo
6577
std::vector<TResult> Messages;
6678
};
6779

80+
struct TWriterSettings {
81+
TString DatabasePath;
82+
TString TopicName;
83+
84+
struct TMessage {
85+
size_t Index;
86+
TString MessageBody;
87+
std::optional<TString> MessageGroupId;
88+
std::optional<TString> MessageDeduplicationId;
89+
std::optional<TString> SerializedMessageAttributes;
90+
TDuration Delay;
91+
};
92+
std::vector<TMessage> Messages;
93+
94+
bool ShouldBeCharged = false;
95+
96+
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
97+
};
6898

99+
IActor* CreateWriter(const NActors::TActorId& parentId, TWriterSettings&& settings);
69100

70101
struct TReaderSettings {
71102
TString DatabasePath;

ydb/core/persqueue/public/mlp/mlp_changer_ut.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Y_UNIT_TEST_SUITE(TMLPChangerTests) {
66

77
Y_UNIT_TEST(TopicNotExists) {
88
auto setup = CreateSetup();
9-
9+
1010
auto& runtime = setup->GetRuntime();
1111
CreateCommitterActor(runtime, {
1212
.DatabasePath = "/Root",
@@ -25,7 +25,7 @@ Y_UNIT_TEST(ConsumerNotExists) {
2525
auto setup = CreateSetup();
2626

2727
ExecuteDDL(*setup, "CREATE TOPIC topic1");
28-
28+
2929
auto& runtime = setup->GetRuntime();
3030
CreateCommitterActor(runtime, {
3131
.DatabasePath = "/Root",
@@ -43,7 +43,7 @@ Y_UNIT_TEST(PartitionNotExists) {
4343
auto setup = CreateSetup();
4444

4545
CreateTopic(setup, "/Root/topic1", "mlp-consumer");
46-
46+
4747
auto& runtime = setup->GetRuntime();
4848
CreateCommitterActor(runtime, {
4949
.DatabasePath = "/Root",
@@ -69,7 +69,7 @@ Y_UNIT_TEST(CommitTest) {
6969
setup->Write("/Root/topic1", "msg-2", 0);
7070

7171
Sleep(TDuration::Seconds(2));
72-
72+
7373
auto& runtime = setup->GetRuntime();
7474
CreateCommitterActor(runtime, {
7575
.DatabasePath = "/Root",
@@ -269,7 +269,7 @@ Y_UNIT_TEST(CapacityTest) {
269269
hFunc(NMLP::TEvChangeResponse, Handle);
270270
sFunc(TEvents::TEvPoison, PassAway);
271271
sFunc(TEvents::TEvWakeup, PassAway);
272-
}
272+
}
273273
}
274274

275275
size_t Infly = 0;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
#pragma once
22

3+
#include <ydb/core/persqueue/public/constants.h>
4+
35
#include <util/generic/strbuf.h>
46

57
namespace NKikimr::NPQ::NMLP {
68
namespace NMessageConsts {
79
constexpr TStringBuf MessageDeduplicationId = "__MessageDeduplicationId";
810
constexpr TStringBuf MessageAttributes = "__MessageAttributes";
911
constexpr TStringBuf DelaySeconds = "__DelaySeconds";
10-
constexpr TStringBuf MessageId = "__MessageId";
11-
1212
} // namespace NMessageConsts
1313
} // namespace NKikimr::NPQ::NMLP

0 commit comments

Comments
 (0)