Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ enum ETransactionKind {
TX_KIND_SHARING = 7;
TX_KIND_COMMIT_WRITE_PRIMARY = 8;
TX_KIND_COMMIT_WRITE_SECONDARY = 9;
TX_KIND_RESTORE = 10;
}

enum ETransactionFlag {
Expand Down Expand Up @@ -202,6 +203,10 @@ message TBackupTxBody {
optional NKikimrSchemeOp.TBackupTask BackupTask = 1;
}

message TRestoreTxBody {
optional NKikimrSchemeOp.TRestoreTask RestoreTask = 1;
}

message TCommitWriteTxBody {
optional uint64 LockId = 1;

Expand Down
54 changes: 54 additions & 0 deletions ydb/core/tx/columnshard/backup/import/control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "control.h"
#include "session.h"

namespace NKikimr::NOlap::NImport {

NKikimr::TConclusionStatus TConfirmSessionControl::DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const {
auto exportSession = dynamic_pointer_cast<TSession>(session);
AFL_VERIFY(exportSession);
exportSession->Confirm();
return TConclusionStatus::Success();
}

NKikimr::TConclusionStatus TAbortSessionControl::DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const {
auto exportSession = dynamic_pointer_cast<TSession>(session);
AFL_VERIFY(exportSession);
exportSession->Abort();
return TConclusionStatus::Success();
}

TString TConfirmSessionControl::GetClassNameStatic() {
return "CS::IMPORT::CONFIRM";
}

TString TConfirmSessionControl::GetClassName() const {
return GetClassNameStatic();
}

TString TAbortSessionControl::GetClassName() const {
return GetClassNameStatic();
}

TString TAbortSessionControl::GetClassNameStatic() {
return "CS::IMPORT::ABORT";
}

TConclusionStatus TConfirmSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) {
return TConclusionStatus::Success();
}

NKikimrColumnShardExportProto::TSessionControlContainer TConfirmSessionControl::DoSerializeToProto() const {
NKikimrColumnShardExportProto::TSessionControlContainer result;
return result;
}

TConclusionStatus TAbortSessionControl::DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer & /*proto*/) {
return TConclusionStatus::Success();
}

NKikimrColumnShardExportProto::TSessionControlContainer TAbortSessionControl::DoSerializeToProto() const {
NKikimrColumnShardExportProto::TSessionControlContainer result;
return result;
}

} // namespace NKikimr::NOlap::NImport
40 changes: 40 additions & 0 deletions ydb/core/tx/columnshard/backup/import/control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/core/tx/columnshard/bg_tasks/abstract/control.h>
#include <ydb/core/tx/columnshard/bg_tasks/abstract/session.h>
#include <ydb/core/tx/columnshard/export/protos/task.pb.h>

namespace NKikimr::NOlap::NImport {

class TConfirmSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl>;
public:
static TString GetClassNameStatic();

private:
virtual TConclusionStatus DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const override;
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override;
virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override;
virtual TString GetClassName() const override;
static const inline TFactory::TRegistrator<TConfirmSessionControl> Registrator = TFactory::TRegistrator<TConfirmSessionControl>(GetClassNameStatic());
public:
using TBase::TBase;
};

class TAbortSessionControl: public NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoAdapter<NKikimrColumnShardExportProto::TSessionControlContainer, NBackground::ISessionLogicControl>;
public:
static TString GetClassNameStatic();

private:
virtual TConclusionStatus DoApply(const std::shared_ptr<NBackground::ISessionLogic>& session) const override;
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TSessionControlContainer& /*proto*/) override;
virtual NKikimrColumnShardExportProto::TSessionControlContainer DoSerializeToProto() const override;
virtual TString GetClassName() const override;
static const inline TFactory::TRegistrator<TAbortSessionControl> Registrator = TFactory::TRegistrator<TAbortSessionControl>(GetClassNameStatic());
public:
using TBase::TBase;
};
}
61 changes: 61 additions & 0 deletions ydb/core/tx/columnshard/backup/import/import_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "import_actor.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NOlap::NImport {

class TTxProposeFinish: public NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard> {
private:
using TBase = NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard>;
const ui64 TxId;
protected:
virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override {
Self->GetProgressTxController().FinishProposeOnExecute(TxId, txc);
return true;
}
virtual void Complete(const TActorContext& ctx) override {
Self->GetProgressTxController().FinishProposeOnComplete(TxId, ctx);
}
public:
TTxProposeFinish(NColumnShard::TColumnShard* self, const ui64 txId)
: TBase(self)
, TxId(txId) {
}
};

void TImportActor::OnSessionStateSaved() {
AFL_VERIFY(ImportSession->IsFinished());
NYDBTest::TControllers::GetColumnShardController()->OnExportFinished();
if (ImportSession->GetTxId()) {
ExecuteTransaction(std::make_unique<TTxProposeFinish>(GetShardVerified<NColumnShard::TColumnShard>(), *ImportSession->GetTxId()));
} else {
Session->FinishActor();
}
}

void TImportActor::SwitchStage(const EStage from, const EStage to) {
AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to",
(ui32)to);
Stage = to;
}

void TImportActor::OnTxCompleted(const ui64 /*txId*/) {
Session->FinishActor();
}

void TImportActor::OnBootstrap(const TActorContext & /*ctx*/) {
SwitchStage(EStage::Initialization, EStage::WaitSaveCursor);
SaveSessionProgress();
Become(&TImportActor::StateFunc);
}

TImportActor::TImportActor(std::shared_ptr<NBackground::TSession> bgSession, const std::shared_ptr<NBackground::ITabletAdapter> &adapter)
: TBase(bgSession, adapter) {
ImportSession = bgSession->GetLogicAsVerifiedPtr<NImport::TSession>();
}

void TImportActor::OnSessionProgressSaved() {
SwitchStage(EStage::WaitSaveCursor, EStage::WaitData);
SaveSessionState();
}

} // namespace NKikimr::NOlap::NImport
55 changes: 55 additions & 0 deletions ydb/core/tx/columnshard/backup/import/import_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/columnshard/bg_tasks/manager/actor.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storage.h>
#include <ydb/core/tx/columnshard/backup/import/session.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>

namespace NKikimr::NOlap::NImport {

class TImportActor: public NBackground::TSessionActor {
private:
enum class EStage {
Initialization,
WaitData,
WaitWriting,
WaitSaveCursor,
Finished
};

using TBase = NBackground::TSessionActor;

EStage Stage = EStage::Initialization;
std::shared_ptr<NImport::TSession> ImportSession;
void SwitchStage(const EStage from, const EStage to);

protected:

virtual void OnSessionStateSaved() override;

virtual void OnTxCompleted(const ui64 /*txId*/) override;

virtual void OnSessionProgressSaved() override;

virtual void OnBootstrap(const TActorContext & /*ctx*/) override;

public:
TImportActor(std::shared_ptr<NBackground::TSession> bgSession, const std::shared_ptr<NBackground::ITabletAdapter> &adapter);

STATEFN(StateFunc) {
try {
switch (ev->GetTypeRewrite()) {
default:
TBase::StateInProgress(ev);
}
} catch (...) {
AFL_VERIFY(false);
}
}
};

} // namespace NKikimr::NOlap::NImport
22 changes: 22 additions & 0 deletions ydb/core/tx/columnshard/backup/import/protos/task.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package NKikimrColumnShardImportProto;

message TIdentifier {
optional uint64 PathId = 1;
}

message TImportTask {
optional TIdentifier Identifier = 1;
optional uint64 TxId = 5;
}

message TSessionControlContainer {
optional string ClassName = 1;
}

message TImportSessionLogic {
optional TImportTask Task = 1;
}

message TImportSessionState {
optional string Status = 1;
}
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/backup/import/protos/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
PROTO_LIBRARY()
PROTOC_FATAL_WARNINGS()

SRCS(
task.proto
)

END()
116 changes: 116 additions & 0 deletions ydb/core/tx/columnshard/backup/import/session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "session.h"

#include <ydb/core/tx/columnshard/backup/import/import_actor.h>
#include <ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NOlap::NImport {

NKikimr::TConclusion<std::unique_ptr<NActors::IActor>> TSession::DoCreateActor(const NBackground::TStartContext& context) const {
AFL_VERIFY(IsConfirmed());
Status = EStatus::Started;
return std::make_unique<TImportActor>(context.GetSessionSelfPtr(), context.GetAdapter());
}

void TSession::Finish() {
AFL_VERIFY(Status == EStatus::Started);
Status = EStatus::Finished;
}

const TInternalPathId TSession::GetInternalPathId() const {
return Task->GetInternalPathId();
}

const TImportTask &TSession::GetTask() const {
return *Task;
}

bool TSession::IsStarted() const {
return Status == EStatus::Started;
}

void TSession::Abort() {
AFL_VERIFY(Status != EStatus::Finished && Status != EStatus::Aborted);
Status = EStatus::Aborted;
}

void TSession::Confirm() {
AFL_VERIFY(IsDraft());
Status = EStatus::Confirmed;
}

bool TSession::IsDraft() const {
return Status == EStatus::Draft;
}

TString TSession::DebugString() const {
return TStringBuilder() << "task=" << Task->DebugString()
<< ";status=" << Status;
}

bool TSession::IsConfirmed() const {
return Status == EStatus::Confirmed;
}

TSession::TSession(const std::shared_ptr<TImportTask> &task) : Task(task) {
AFL_VERIFY(Task);
}

TString TSession::GetClassName() const {
return GetClassNameStatic();
}

bool TSession::IsReadyForRemoveOnFinished() const {
return Status == EStatus::Aborted;
}

bool TSession::IsFinished() const {
return Status == EStatus::Finished;
}

bool TSession::IsReadyForStart() const {
return Status == EStatus::Confirmed;
}

std::optional<ui64> TSession::GetTxId() const {
return Task->GetTxId();
}

TSession::TProtoLogic TSession::DoSerializeToProto() const {
TProtoLogic result;
*result.MutableTask() = Task->SerializeToProto();
return result;
}

TConclusionStatus TSession::DoDeserializeFromProto(const TProtoLogic &proto) {
Task = std::make_shared<TImportTask>();
return Task->DeserializeFromProto(proto.GetTask());
}

TSession::TProtoState TSession::DoSerializeStateToProto() const {
TProtoState result;
result.SetStatus(::ToString(Status));
return result;
}

TConclusionStatus TSession::DoDeserializeStateFromProto(const TProtoState &proto) {
if (!TryFromString(proto.GetStatus(), Status)) {
return TConclusionStatus::Fail("cannot read status from proto: " +
proto.GetStatus());
}
return TConclusionStatus::Success();
}

TSession::TProtoProgress TSession::DoSerializeProgressToProto() const {
return NProtoBuf::Empty{};
}

TConclusionStatus TSession::DoDeserializeProgressFromProto(const TProtoProgress & /* proto */) {
return TConclusionStatus::Success();
}

TString TSession::GetClassNameStatic() {
return "CS::IMPORT";
}

} // namespace NKikimr::NOlap::NImport
Loading
Loading