Skip to content

Commit f0211ff

Browse files
authored
Merge 7356e50 into b8bc549
2 parents b8bc549 + 7356e50 commit f0211ff

File tree

16 files changed

+706
-0
lines changed

16 files changed

+706
-0
lines changed

ydb/core/protos/tx_columnshard.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ enum ETransactionKind {
147147
TX_KIND_SHARING = 7;
148148
TX_KIND_COMMIT_WRITE_PRIMARY = 8;
149149
TX_KIND_COMMIT_WRITE_SECONDARY = 9;
150+
TX_KIND_RESTORE = 10;
150151
}
151152

152153
enum ETransactionFlag {
@@ -202,6 +203,10 @@ message TBackupTxBody {
202203
optional NKikimrSchemeOp.TBackupTask BackupTask = 1;
203204
}
204205

206+
message TRestoreTxBody {
207+
optional NKikimrSchemeOp.TRestoreTask RestoreTask = 1;
208+
}
209+
205210
message TCommitWriteTxBody {
206211
optional uint64 LockId = 1;
207212

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#include "control.h"
2+
#include "session.h"
3+
4+
namespace NKikimr::NOlap::NImport {
5+
6+
NKikimr::TConclusionStatus TConfirmSessionControl::DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const {
7+
auto exportSession = dynamic_pointer_cast<TSession>(session);
8+
AFL_VERIFY(exportSession);
9+
exportSession->Confirm();
10+
return TConclusionStatus::Success();
11+
}
12+
13+
NKikimr::TConclusionStatus TAbortSessionControl::DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const {
14+
auto exportSession = dynamic_pointer_cast<TSession>(session);
15+
AFL_VERIFY(exportSession);
16+
exportSession->Abort();
17+
return TConclusionStatus::Success();
18+
}
19+
20+
TString TConfirmSessionControl::GetClassNameStatic() {
21+
return "CS::IMPORT::CONFIRM";
22+
}
23+
24+
TString TConfirmSessionControl::GetClassName() const {
25+
return GetClassNameStatic();
26+
}
27+
28+
TString TAbortSessionControl::GetClassName() const {
29+
return GetClassNameStatic();
30+
}
31+
32+
TString TAbortSessionControl::GetClassNameStatic() {
33+
return "CS::IMPORT::ABORT";
34+
}
35+
36+
TConclusionStatus TConfirmSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) {
37+
return TConclusionStatus::Success();
38+
}
39+
40+
NKikimrColumnShardExportProto::TSessionControlContainer TConfirmSessionControl::DoSerializeToProto() const {
41+
NKikimrColumnShardExportProto::TSessionControlContainer result;
42+
return result;
43+
}
44+
45+
TConclusionStatus TAbortSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) {
46+
return TConclusionStatus::Success();
47+
}
48+
49+
NKikimrColumnShardExportProto::TSessionControlContainer TAbortSessionControl::DoSerializeToProto() const {
50+
NKikimrColumnShardExportProto::TSessionControlContainer result;
51+
return result;
52+
}
53+
54+
} // namespace NKikimr::NOlap::NImport
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#pragma once
2+
#include <ydb/services/bg_tasks/abstract/interface.h>
3+
#include <ydb/core/tx/columnshard/bg_tasks/abstract/control.h>
4+
#include <ydb/core/tx/columnshard/bg_tasks/abstract/session.h>
5+
#include <ydb/core/tx/columnshard/export/protos/task.pb.h>
6+
7+
namespace NKikimr::NOlap::NImport {
8+
9+
class TConfirmSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl> {
10+
private:
11+
using TBase = NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl>;
12+
public:
13+
static TString GetClassNameStatic();
14+
15+
private:
16+
virtual TConclusionStatus DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const override;
17+
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override;
18+
virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override;
19+
virtual TString GetClassName() const override;
20+
static const inline TFactory::TRegistrator<TConfirmSessionControl> Registrator = TFactory::TRegistrator<TConfirmSessionControl>(GetClassNameStatic());
21+
public:
22+
using TBase::TBase;
23+
};
24+
25+
class TAbortSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl> {
26+
private:
27+
using TBase = NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl>;
28+
public:
29+
static TString GetClassNameStatic();
30+
31+
private:
32+
virtual TConclusionStatus DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const override;
33+
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override;
34+
virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override;
35+
virtual TString GetClassName() const override;
36+
static const inline TFactory::TRegistrator<TAbortSessionControl> Registrator = TFactory::TRegistrator<TAbortSessionControl>(GetClassNameStatic());
37+
public:
38+
using TBase::TBase;
39+
};
40+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#include "import_actor.h"
2+
#include <ydb/core/tx/columnshard/columnshard_impl.h>
3+
4+
namespace NKikimr::NOlap::NImport {
5+
6+
class TTxProposeFinish: public NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard> {
7+
private:
8+
using TBase = NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard>;
9+
const ui64 TxId;
10+
protected:
11+
virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override {
12+
Self->GetProgressTxController().FinishProposeOnExecute(TxId, txc);
13+
return true;
14+
}
15+
virtual void Complete(const TActorContext& ctx) override {
16+
Self->GetProgressTxController().FinishProposeOnComplete(TxId, ctx);
17+
}
18+
public:
19+
TTxProposeFinish(NColumnShard::TColumnShard* self, const ui64 txId)
20+
: TBase(self)
21+
, TxId(txId) {
22+
}
23+
};
24+
25+
void TImportActor::OnSessionStateSaved() {
26+
AFL_VERIFY(ImportSession->IsFinished());
27+
NYDBTest::TControllers::GetColumnShardController()->OnExportFinished();
28+
if (ImportSession->GetTxId()) {
29+
ExecuteTransaction(std::make_unique<TTxProposeFinish>(GetShardVerified<NColumnShard::TColumnShard>(), *ImportSession->GetTxId()));
30+
} else {
31+
Session->FinishActor();
32+
}
33+
}
34+
35+
void TImportActor::SwitchStage(const EStage from, const EStage to) {
36+
AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to",
37+
(ui32)to);
38+
Stage = to;
39+
}
40+
41+
void TImportActor::OnTxCompleted(const ui64 /*txId*/) {
42+
Session->FinishActor();
43+
}
44+
45+
void TImportActor::OnBootstrap(const TActorContext & /*ctx*/) {
46+
SwitchStage(EStage::WaitWriting, EStage::WaitSaveCursor);
47+
SaveSessionProgress();
48+
Become(&TImportActor::StateFunc);
49+
}
50+
51+
TImportActor::TImportActor(std::shared_ptr<NBackground::TSession> bgSession, const std::shared_ptr<NBackground::ITabletAdapter> &adapter)
52+
: TBase(bgSession, adapter) {
53+
ImportSession = bgSession->GetLogicAsVerifiedPtr<NImport::TSession>();
54+
}
55+
56+
void TImportActor::OnSessionProgressSaved() {
57+
SwitchStage(EStage::WaitSaveCursor, EStage::WaitData);
58+
SaveSessionState();
59+
}
60+
61+
} // namespace NKikimr::NOlap::NImport
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#pragma once
2+
3+
#include <ydb/core/formats/arrow/serializer/abstract.h>
4+
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
5+
#include <ydb/core/tx/columnshard/bg_tasks/manager/actor.h>
6+
#include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h>
7+
#include <ydb/core/tx/columnshard/backup/import/session.h>
8+
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
9+
10+
#include <ydb/library/actors/core/actor_bootstrapped.h>
11+
12+
namespace NKikimr::NOlap::NImport {
13+
14+
class TImportActor: public NBackground::TSessionActor {
15+
private:
16+
enum class EStage {
17+
Initialization,
18+
WaitData,
19+
WaitWriting,
20+
WaitSaveCursor,
21+
Finished
22+
};
23+
24+
using TBase = NBackground::TSessionActor;
25+
26+
EStage Stage = EStage::Initialization;
27+
std::shared_ptr<NImport::TSession> ImportSession;
28+
void SwitchStage(const EStage from, const EStage to);
29+
30+
protected:
31+
32+
virtual void OnSessionStateSaved() override;
33+
34+
virtual void OnTxCompleted(const ui64 /*txId*/) override;
35+
36+
virtual void OnSessionProgressSaved() override;
37+
38+
virtual void OnBootstrap(const TActorContext & /*ctx*/) override;
39+
40+
public:
41+
TImportActor(std::shared_ptr<NBackground::TSession> bgSession, const std::shared_ptr<NBackground::ITabletAdapter> &adapter);
42+
43+
STATEFN(StateFunc) {
44+
try {
45+
switch (ev->GetTypeRewrite()) {
46+
default:
47+
TBase::StateInProgress(ev);
48+
}
49+
} catch (...) {
50+
AFL_VERIFY(false);
51+
}
52+
}
53+
};
54+
55+
} // namespace NKikimr::NOlap::NImport
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package NKikimrColumnShardImportProto;
2+
3+
message TIdentifier {
4+
optional uint64 PathId = 1;
5+
}
6+
7+
message TImportTask {
8+
optional TIdentifier Identifier = 1;
9+
optional uint64 TxId = 5;
10+
}
11+
12+
message TSessionControlContainer {
13+
optional string ClassName = 1;
14+
}
15+
16+
message TImportSessionLogic {
17+
optional TImportTask Task = 1;
18+
}
19+
20+
message TImportSessionState {
21+
optional string Status = 1;
22+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
PROTO_LIBRARY()
2+
PROTOC_FATAL_WARNINGS()
3+
4+
SRCS(
5+
task.proto
6+
)
7+
8+
END()
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#include "session.h"
2+
3+
#include <ydb/core/tx/columnshard/backup/import/import_actor.h>
4+
#include <ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h>
5+
#include <ydb/core/tx/columnshard/columnshard_impl.h>
6+
7+
namespace NKikimr::NOlap::NImport {
8+
9+
NKikimr::TConclusion<std::unique_ptr<NActors::IActor>> TSession::DoCreateActor(const NBackground::TStartContext& context) const {
10+
AFL_VERIFY(IsConfirmed());
11+
Status = EStatus::Started;
12+
return std::make_unique<TImportActor>(context.GetSessionSelfPtr(), context.GetAdapter());
13+
}
14+
15+
void TSession::Finish() {
16+
AFL_VERIFY(Status == EStatus::Started);
17+
Status = EStatus::Finished;
18+
}
19+
20+
const TInternalPathId TSession::GetInternalPathId() const {
21+
return Task->GetInternalPathId();
22+
}
23+
24+
const TImportTask &TSession::GetTask() const {
25+
return *Task;
26+
}
27+
28+
bool TSession::IsStarted() const {
29+
return Status == EStatus::Started;
30+
}
31+
32+
void TSession::Abort() {
33+
AFL_VERIFY(Status != EStatus::Finished && Status != EStatus::Aborted);
34+
Status = EStatus::Aborted;
35+
}
36+
37+
void TSession::Confirm() {
38+
AFL_VERIFY(IsDraft());
39+
Status = EStatus::Confirmed;
40+
}
41+
42+
bool TSession::IsDraft() const {
43+
return Status == EStatus::Draft;
44+
}
45+
46+
TString TSession::DebugString() const {
47+
return TStringBuilder() << "task=" << Task->DebugString()
48+
<< ";status=" << Status;
49+
}
50+
51+
bool TSession::IsConfirmed() const {
52+
return Status == EStatus::Confirmed;
53+
}
54+
55+
TSession::TSession(const std::shared_ptr<TImportTask> &task) : Task(task) {
56+
AFL_VERIFY(Task);
57+
}
58+
59+
TString TSession::GetClassName() const {
60+
return GetClassNameStatic();
61+
}
62+
63+
bool TSession::IsReadyForRemoveOnFinished() const {
64+
return Status == EStatus::Aborted;
65+
}
66+
67+
bool TSession::IsFinished() const {
68+
return Status == EStatus::Finished;
69+
}
70+
71+
bool TSession::IsReadyForStart() const {
72+
return Status == EStatus::Confirmed;
73+
}
74+
75+
std::optional<ui64> TSession::GetTxId() const {
76+
return Task->GetTxId();
77+
}
78+
79+
TSession::TProtoLogic TSession::DoSerializeToProto() const {
80+
TProtoLogic result;
81+
*result.MutableTask() = Task->SerializeToProto();
82+
return result;
83+
}
84+
85+
TConclusionStatus TSession::DoDeserializeFromProto(const TProtoLogic &proto) {
86+
Task = std::make_shared<TImportTask>();
87+
return Task->DeserializeFromProto(proto.GetTask());
88+
}
89+
90+
TSession::TProtoState TSession::DoSerializeStateToProto() const {
91+
TProtoState result;
92+
if (Status == EStatus::Started) {
93+
result.SetStatus(::ToString(EStatus::Confirmed));
94+
} else {
95+
result.SetStatus(::ToString(Status));
96+
}
97+
return result;
98+
}
99+
100+
TConclusionStatus TSession::DoDeserializeStateFromProto(const TProtoState &proto) {
101+
if (!TryFromString(proto.GetStatus(), Status)) {
102+
return TConclusionStatus::Fail("cannot read status from proto: " +
103+
proto.GetStatus());
104+
}
105+
return TConclusionStatus::Success();
106+
}
107+
108+
TSession::TProtoProgress TSession::DoSerializeProgressToProto() const {
109+
return NProtoBuf::Empty{};
110+
}
111+
112+
TConclusionStatus TSession::DoDeserializeProgressFromProto(const TProtoProgress & /* proto */) {
113+
return TConclusionStatus::Success();
114+
}
115+
116+
TString TSession::GetClassNameStatic() {
117+
return "CS::EXPORT";
118+
}
119+
120+
} // namespace NKikimr::NOlap::NImport

0 commit comments

Comments
 (0)