diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 86619e5203c1..6447c6368214 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -147,6 +147,7 @@ enum ETransactionKind { TX_KIND_SHARING = 7; TX_KIND_COMMIT_WRITE_PRIMARY = 8; TX_KIND_COMMIT_WRITE_SECONDARY = 9; + TX_KIND_RESTORE = 10; } enum ETransactionFlag { @@ -202,6 +203,10 @@ message TBackupTxBody { optional NKikimrSchemeOp.TBackupTask BackupTask = 1; } +message TRestoreTxBody { + optional NKikimrSchemeOp.TRestoreTask RestoreTask = 1; +} + message TCommitWriteTxBody { optional uint64 LockId = 1; diff --git a/ydb/core/tx/columnshard/backup/import/control.cpp b/ydb/core/tx/columnshard/backup/import/control.cpp new file mode 100644 index 000000000000..8a768f29f695 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/control.cpp @@ -0,0 +1,54 @@ +#include "control.h" +#include "session.h" + +namespace NKikimr::NOlap::NImport { + +NKikimr::TConclusionStatus TConfirmSessionControl::DoApply(const std::shared_ptr& session) const { + auto exportSession = dynamic_pointer_cast(session); + AFL_VERIFY(exportSession); + exportSession->Confirm(); + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TAbortSessionControl::DoApply(const std::shared_ptr& session) const { + auto exportSession = dynamic_pointer_cast(session); + AFL_VERIFY(exportSession); + exportSession->Abort(); + return TConclusionStatus::Success(); +} + +TString TConfirmSessionControl::GetClassNameStatic() { + return "CS::IMPORT::CONFIRM"; +} + +TString TConfirmSessionControl::GetClassName() const { + return GetClassNameStatic(); +} + +TString TAbortSessionControl::GetClassName() const { + return GetClassNameStatic(); +} + +TString TAbortSessionControl::GetClassNameStatic() { + return "CS::IMPORT::ABORT"; +} + +TConclusionStatus TConfirmSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) { + return TConclusionStatus::Success(); +} + +NKikimrColumnShardExportProto::TSessionControlContainer TConfirmSessionControl::DoSerializeToProto() const { + NKikimrColumnShardExportProto::TSessionControlContainer result; + return result; +} + +TConclusionStatus TAbortSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) { + return TConclusionStatus::Success(); +} + +NKikimrColumnShardExportProto::TSessionControlContainer TAbortSessionControl::DoSerializeToProto() const { + NKikimrColumnShardExportProto::TSessionControlContainer result; + return result; +} + +} // namespace NKikimr::NOlap::NImport diff --git a/ydb/core/tx/columnshard/backup/import/control.h b/ydb/core/tx/columnshard/backup/import/control.h new file mode 100644 index 000000000000..2bc899fe6895 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/control.h @@ -0,0 +1,40 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NImport { + +class TConfirmSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetClassNameStatic(); + +private: + virtual TConclusionStatus DoApply(const std::shared_ptr& session) const override; + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override; + virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override; + virtual TString GetClassName() const override; + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); +public: + using TBase::TBase; +}; + +class TAbortSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetClassNameStatic(); + +private: + virtual TConclusionStatus DoApply(const std::shared_ptr& session) const override; + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override; + virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override; + virtual TString GetClassName() const override; + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); +public: + using TBase::TBase; +}; +} diff --git a/ydb/core/tx/columnshard/backup/import/import_actor.cpp b/ydb/core/tx/columnshard/backup/import/import_actor.cpp new file mode 100644 index 000000000000..d1ae309f27ae --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/import_actor.cpp @@ -0,0 +1,61 @@ +#include "import_actor.h" +#include + +namespace NKikimr::NOlap::NImport { + +class TTxProposeFinish: public NTabletFlatExecutor::TTransactionBase { +private: + using TBase = NTabletFlatExecutor::TTransactionBase; + const ui64 TxId; +protected: + virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override { + Self->GetProgressTxController().FinishProposeOnExecute(TxId, txc); + return true; + } + virtual void Complete(const TActorContext& ctx) override { + Self->GetProgressTxController().FinishProposeOnComplete(TxId, ctx); + } +public: + TTxProposeFinish(NColumnShard::TColumnShard* self, const ui64 txId) + : TBase(self) + , TxId(txId) { + } +}; + +void TImportActor::OnSessionStateSaved() { + AFL_VERIFY(ImportSession->IsFinished()); + NYDBTest::TControllers::GetColumnShardController()->OnExportFinished(); + if (ImportSession->GetTxId()) { + ExecuteTransaction(std::make_unique(GetShardVerified(), *ImportSession->GetTxId())); + } else { + Session->FinishActor(); + } +} + +void TImportActor::SwitchStage(const EStage from, const EStage to) { + AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", + (ui32)to); + Stage = to; +} + +void TImportActor::OnTxCompleted(const ui64 /*txId*/) { + Session->FinishActor(); +} + +void TImportActor::OnBootstrap(const TActorContext & /*ctx*/) { + SwitchStage(EStage::Initialization, EStage::WaitSaveCursor); + SaveSessionProgress(); + Become(&TImportActor::StateFunc); +} + +TImportActor::TImportActor(std::shared_ptr bgSession, const std::shared_ptr &adapter) + : TBase(bgSession, adapter) { + ImportSession = bgSession->GetLogicAsVerifiedPtr(); +} + +void TImportActor::OnSessionProgressSaved() { + SwitchStage(EStage::WaitSaveCursor, EStage::WaitData); + SaveSessionState(); +} + +} // namespace NKikimr::NOlap::NImport \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/import/import_actor.h b/ydb/core/tx/columnshard/backup/import/import_actor.h new file mode 100644 index 000000000000..5622974540ab --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/import_actor.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap::NImport { + +class TImportActor: public NBackground::TSessionActor { +private: + enum class EStage { + Initialization, + WaitData, + WaitWriting, + WaitSaveCursor, + Finished + }; + + using TBase = NBackground::TSessionActor; + + EStage Stage = EStage::Initialization; + std::shared_ptr ImportSession; + void SwitchStage(const EStage from, const EStage to); + + protected: + + virtual void OnSessionStateSaved() override; + + virtual void OnTxCompleted(const ui64 /*txId*/) override; + + virtual void OnSessionProgressSaved() override; + + virtual void OnBootstrap(const TActorContext & /*ctx*/) override; + + public: + TImportActor(std::shared_ptr bgSession, const std::shared_ptr &adapter); + + STATEFN(StateFunc) { + try { + switch (ev->GetTypeRewrite()) { + default: + TBase::StateInProgress(ev); + } + } catch (...) { + AFL_VERIFY(false); + } + } +}; + +} // namespace NKikimr::NOlap::NImport diff --git a/ydb/core/tx/columnshard/backup/import/protos/task.proto b/ydb/core/tx/columnshard/backup/import/protos/task.proto new file mode 100644 index 000000000000..0501881fc4d4 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/protos/task.proto @@ -0,0 +1,22 @@ +package NKikimrColumnShardImportProto; + +message TIdentifier { + optional uint64 PathId = 1; +} + +message TImportTask { + optional TIdentifier Identifier = 1; + optional uint64 TxId = 5; +} + +message TSessionControlContainer { + optional string ClassName = 1; +} + +message TImportSessionLogic { + optional TImportTask Task = 1; +} + +message TImportSessionState { + optional string Status = 1; +} diff --git a/ydb/core/tx/columnshard/backup/import/protos/ya.make b/ydb/core/tx/columnshard/backup/import/protos/ya.make new file mode 100644 index 000000000000..beb845fbb38b --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/protos/ya.make @@ -0,0 +1,8 @@ +PROTO_LIBRARY() +PROTOC_FATAL_WARNINGS() + +SRCS( + task.proto +) + +END() diff --git a/ydb/core/tx/columnshard/backup/import/session.cpp b/ydb/core/tx/columnshard/backup/import/session.cpp new file mode 100644 index 000000000000..8b3124c0acd5 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/session.cpp @@ -0,0 +1,116 @@ +#include "session.h" + +#include +#include +#include + +namespace NKikimr::NOlap::NImport { + +NKikimr::TConclusion> TSession::DoCreateActor(const NBackground::TStartContext& context) const { + AFL_VERIFY(IsConfirmed()); + Status = EStatus::Started; + return std::make_unique(context.GetSessionSelfPtr(), context.GetAdapter()); +} + +void TSession::Finish() { + AFL_VERIFY(Status == EStatus::Started); + Status = EStatus::Finished; +} + +const TInternalPathId TSession::GetInternalPathId() const { + return Task->GetInternalPathId(); +} + +const TImportTask &TSession::GetTask() const { + return *Task; +} + +bool TSession::IsStarted() const { + return Status == EStatus::Started; +} + +void TSession::Abort() { + AFL_VERIFY(Status != EStatus::Finished && Status != EStatus::Aborted); + Status = EStatus::Aborted; +} + +void TSession::Confirm() { + AFL_VERIFY(IsDraft()); + Status = EStatus::Confirmed; +} + +bool TSession::IsDraft() const { + return Status == EStatus::Draft; +} + +TString TSession::DebugString() const { + return TStringBuilder() << "task=" << Task->DebugString() + << ";status=" << Status; +} + +bool TSession::IsConfirmed() const { + return Status == EStatus::Confirmed; +} + +TSession::TSession(const std::shared_ptr &task) : Task(task) { + AFL_VERIFY(Task); +} + +TString TSession::GetClassName() const { + return GetClassNameStatic(); +} + +bool TSession::IsReadyForRemoveOnFinished() const { + return Status == EStatus::Aborted; +} + +bool TSession::IsFinished() const { + return Status == EStatus::Finished; +} + +bool TSession::IsReadyForStart() const { + return Status == EStatus::Confirmed; +} + +std::optional TSession::GetTxId() const { + return Task->GetTxId(); +} + +TSession::TProtoLogic TSession::DoSerializeToProto() const { + TProtoLogic result; + *result.MutableTask() = Task->SerializeToProto(); + return result; +} + +TConclusionStatus TSession::DoDeserializeFromProto(const TProtoLogic &proto) { + Task = std::make_shared(); + return Task->DeserializeFromProto(proto.GetTask()); +} + +TSession::TProtoState TSession::DoSerializeStateToProto() const { + TProtoState result; + result.SetStatus(::ToString(Status)); + return result; +} + +TConclusionStatus TSession::DoDeserializeStateFromProto(const TProtoState &proto) { + if (!TryFromString(proto.GetStatus(), Status)) { + return TConclusionStatus::Fail("cannot read status from proto: " + + proto.GetStatus()); + } + return TConclusionStatus::Success(); +} + +TSession::TProtoProgress TSession::DoSerializeProgressToProto() const { + return NProtoBuf::Empty{}; +} + +TConclusionStatus TSession::DoDeserializeProgressFromProto(const TProtoProgress & /* proto */) { + return TConclusionStatus::Success(); +} + +TString TSession::GetClassNameStatic() { + return "CS::IMPORT"; +} + +} // namespace NKikimr::NOlap::NImport diff --git a/ydb/core/tx/columnshard/backup/import/session.h b/ydb/core/tx/columnshard/backup/import/session.h new file mode 100644 index 000000000000..2f8cf5f4f428 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/session.h @@ -0,0 +1,88 @@ +#pragma once +#include "task.h" + +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { +class TColumnShard; +} + +namespace NKikimr::NTabletFlatExecutor { +class ITransaction; +} + +namespace NKikimr::NOlap { +class IStoragesManager; +} + +namespace NKikimr::NOlap::NImport { +class TSession: public NBackground::TSessionProtoAdapter { +public: + static TString GetClassNameStatic(); + + enum class EStatus : ui64 { + Draft = 0 /*"draft"*/, + Confirmed = 1 /*"confirmed"*/, + Started = 2 /*"started"*/, + Finished = 3 /*"finished"*/, + Aborted = 4 /*"aborted"*/ + }; + +private: + std::shared_ptr Task; + mutable EStatus Status = EStatus::Draft; + + virtual TConclusion> DoCreateActor(const NBackground::TStartContext& context) const override; + + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress & /* proto */) override; + + virtual TProtoProgress DoSerializeProgressToProto() const override; + + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState &proto) override; + + virtual TProtoState DoSerializeStateToProto() const override; + + virtual TConclusionStatus DoDeserializeFromProto(const TProtoLogic &proto) override; + + virtual TProtoLogic DoSerializeToProto() const override; + + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +public: + std::optional GetTxId() const; + + virtual bool IsReadyForStart() const override; + + virtual bool IsFinished() const override; + + virtual bool IsReadyForRemoveOnFinished() const override; + + virtual TString GetClassName() const override; + + TSession() = default; + + TSession(const std::shared_ptr &task); + + bool IsConfirmed() const; + + TString DebugString() const; + + bool IsDraft() const; + + void Confirm(); + + void Abort(); + + bool IsStarted() const; + + const TImportTask &GetTask() const; + + const TInternalPathId GetInternalPathId() const; + + void Finish(); +}; + +} // namespace NKikimr::NOlap::NImport diff --git a/ydb/core/tx/columnshard/backup/import/task.cpp b/ydb/core/tx/columnshard/backup/import/task.cpp new file mode 100644 index 000000000000..07cd85866096 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/task.cpp @@ -0,0 +1,62 @@ +#include "task.h" +#include "session.h" +#include "control.h" +#include + +namespace NKikimr::NOlap::NImport { + +NKikimr::TConclusionStatus TImportTask::DoDeserializeFromProto(const NKikimrColumnShardImportProto::TImportTask& proto) { + InternalPathId = TInternalPathId::FromRawValue(proto.GetIdentifier().GetPathId()); + if (proto.HasTxId()) { + TxId = proto.GetTxId(); + } + return TConclusionStatus::Success(); +} + +NKikimrColumnShardImportProto::TImportTask TImportTask::DoSerializeToProto() const { + NKikimrColumnShardImportProto::TImportTask result; + result.MutableIdentifier()->SetPathId(InternalPathId.GetRawValue()); + if (TxId) { + result.SetTxId(*TxId); + } + return result; +} + +NBackground::TSessionControlContainer TImportTask::BuildConfirmControl() const { + return NBackground::TSessionControlContainer(std::make_shared(), std::make_shared(GetClassName(), ::ToString(InternalPathId.DebugString()))); +} + +NBackground::TSessionControlContainer TImportTask::BuildAbortControl() const { + return NBackground::TSessionControlContainer(std::make_shared(), std::make_shared(GetClassName(), ::ToString(InternalPathId.DebugString()))); +} + +std::shared_ptr TImportTask::DoBuildSession() const { + auto result = std::make_shared(std::make_shared(InternalPathId, TxId)); + if (!!TxId) { + result->Confirm(); + } + return result; +} + +TString TImportTask::GetClassNameStatic() { + return "CS::EXPORT"; +} + +TString TImportTask::GetClassName() const { + return GetClassNameStatic(); +} + +const TInternalPathId TImportTask::GetInternalPathId() const { + return InternalPathId; +} + +TImportTask::TImportTask(const TInternalPathId &internalPathId, + const std::optional txId) + : InternalPathId(internalPathId), TxId(txId) { +} + +TString TImportTask::DebugString() const { + return TStringBuilder() << "{internal_path_id=" << InternalPathId.DebugString() << ";}"; +} + +} // namespace NKikimr::NOlap::NImport diff --git a/ydb/core/tx/columnshard/backup/import/task.h b/ydb/core/tx/columnshard/backup/import/task.h new file mode 100644 index 000000000000..8ab1cd6e9cf1 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/task.h @@ -0,0 +1,45 @@ +#pragma once +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace NKikimr::NOlap::NImport { + +class TImportTask: public NBackgroundTasks::TInterfaceProtoAdapter { +public: + static TString GetClassNameStatic(); + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +private: + TInternalPathId InternalPathId; + YDB_READONLY_DEF(std::optional, TxId); + +private: + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardImportProto::TImportTask& proto) override; + virtual NKikimrColumnShardImportProto::TImportTask DoSerializeToProto() const override; + + virtual std::shared_ptr DoBuildSession() const override; + +public: + virtual TString GetClassName() const override; + + NBackground::TSessionControlContainer BuildConfirmControl() const; + NBackground::TSessionControlContainer BuildAbortControl() const; + + const TInternalPathId GetInternalPathId() const; + + TImportTask() = default; + + TImportTask(const TInternalPathId &internalPathId, const std::optional txId = {}); + + TString DebugString() const; +}; +} diff --git a/ydb/core/tx/columnshard/backup/import/ya.make b/ydb/core/tx/columnshard/backup/import/ya.make new file mode 100644 index 000000000000..eda0ecae6675 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/import/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + GLOBAL session.cpp + GLOBAL task.cpp + GLOBAL control.cpp + import_actor.cpp +) + +PEERDIR( + ydb/core/kqp/compute_actor + ydb/core/scheme + ydb/core/tablet_flat + ydb/core/tx/columnshard/backup/import/protos + ydb/core/tx/columnshard/bg_tasks +) + +GENERATE_ENUM_SERIALIZATION(session.h) + +END() diff --git a/ydb/core/tx/columnshard/backup/ya.make b/ydb/core/tx/columnshard/backup/ya.make index 78bae5ba1e45..ca946eedc78f 100644 --- a/ydb/core/tx/columnshard/backup/ya.make +++ b/ydb/core/tx/columnshard/backup/ya.make @@ -2,6 +2,7 @@ LIBRARY() PEERDIR( ydb/core/tx/columnshard/backup/iscan + ydb/core/tx/columnshard/backup/import ) END() diff --git a/ydb/core/tx/columnshard/transactions/operators/restore.cpp b/ydb/core/tx/columnshard/transactions/operators/restore.cpp new file mode 100644 index 000000000000..f76acbf15d56 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/restore.cpp @@ -0,0 +1,85 @@ +#include "restore.h" +#include +#include +#include + +namespace NKikimr::NColumnShard { + +bool TRestoreTransactionOperator::DoParse(TColumnShard& owner, const TString& data) { + NKikimrTxColumnShard::TRestoreTxBody txBody; + if (!txBody.ParseFromString(data)) { + return false; + } + if (!txBody.HasRestoreTask()) { + return false; + } + ImportTask = std::make_shared(TInternalPathId::FromRawValue(txBody.GetRestoreTask().GetTableId()), GetTxId()); + NOlap::NBackground::TTask task(::ToString(txBody.GetRestoreTask().GetTableId()), std::make_shared(), ImportTask); + if (!owner.GetBackgroundSessionsManager()->HasTask(task)) { + TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task); + if (!TxAddTask) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_add_task"); + return false; + } + } else { + TaskExists = true; + } + return true; +} + +TRestoreTransactionOperator::TProposeResult TRestoreTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) { + if (!TaskExists) { + AFL_VERIFY(!!TxAddTask); + AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext())); + } + return TProposeResult(); +} + +void TRestoreTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) { + if (!TaskExists) { + AFL_VERIFY(!!TxAddTask); + TxAddTask->Complete(ctx); + TxAddTask.reset(); + } +} + +bool TRestoreTransactionOperator::ProgressOnExecute( + TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) { + return true; +} + +bool TRestoreTransactionOperator::ProgressOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) { + return true; +} + +bool TRestoreTransactionOperator::ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + if (!TxAbort) { + auto control = ImportTask->BuildAbortControl(); + TxAbort = owner.GetBackgroundSessionsManager()->TxApplyControl(control); + } + return TxAbort->Execute(txc, NActors::TActivationContext::AsActorContext()); +} + +TString TRestoreTransactionOperator::DoDebugString() const { + return "RESTORE"; +} + +bool TRestoreTransactionOperator::CompleteOnAbort(TColumnShard & /*owner*/, const TActorContext & /*ctx*/) { + return true; +} + +bool TRestoreTransactionOperator::DoIsAsync() const { + return true; +} + +TString TRestoreTransactionOperator::DoGetOpType() const { + return "Restore"; +} + +void TRestoreTransactionOperator::DoFinishProposeOnComplete(TColumnShard & /*owner*/, const TActorContext & /*ctx*/) { +} + +void TRestoreTransactionOperator::DoFinishProposeOnExecute(TColumnShard & /*owner*/, NTabletFlatExecutor::TTransactionContext & /*txc*/) { +} + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/transactions/operators/restore.h b/ydb/core/tx/columnshard/transactions/operators/restore.h new file mode 100644 index 000000000000..59c4f59514dd --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/restore.h @@ -0,0 +1,40 @@ +#pragma once + +#include "propose_tx.h" +#include +#include + +namespace NKikimr::NColumnShard { + +class TRestoreTransactionOperator: public IProposeTxOperator, public TMonitoringObjectsCounter { +private: + using TBase = IProposeTxOperator; + + std::shared_ptr ImportTask; + bool TaskExists = false; + std::unique_ptr TxAddTask; + std::unique_ptr TxAbort; + using TProposeResult = TTxController::TProposeResult; + static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_RESTORE); + + virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override; + virtual void DoFinishProposeOnExecute(TColumnShard & /*owner*/, NTabletFlatExecutor::TTransactionContext & /*txc*/) override; + virtual void DoFinishProposeOnComplete(TColumnShard & /*owner*/, const TActorContext & /*ctx*/) override; + virtual TString DoGetOpType() const override; + virtual bool DoIsAsync() const override; + virtual bool DoParse(TColumnShard& owner, const TString& data) override; + virtual TString DoDebugString() const override; + + public: + using TBase::TBase; + + virtual bool ProgressOnExecute(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; + + virtual bool ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) override; + + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool CompleteOnAbort(TColumnShard & /*owner*/, const TActorContext & /*ctx*/) override; +}; +} + diff --git a/ydb/core/tx/columnshard/transactions/operators/ya.make b/ydb/core/tx/columnshard/transactions/operators/ya.make index 238fb7a1a36f..7149fd81279a 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ya.make +++ b/ydb/core/tx/columnshard/transactions/operators/ya.make @@ -4,13 +4,15 @@ SRCS( GLOBAL schema.cpp GLOBAL backup.cpp GLOBAL sharing.cpp + GLOBAL restore.cpp propose_tx.cpp ) PEERDIR( + ydb/core/tx/columnshard/backup/import ydb/core/tx/columnshard/data_sharing/destination/events - ydb/core/tx/columnshard/transactions/operators/ev_write ydb/core/tx/columnshard/export/session + ydb/core/tx/columnshard/transactions/operators/ev_write ) END()