Skip to content

Commit 53cf1b7

Browse files
authored
Merge 389ff3e into b8bc549
2 parents b8bc549 + 389ff3e commit 53cf1b7

File tree

5 files changed

+306
-0
lines changed

5 files changed

+306
-0
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#include "import_downloader.h"
2+
3+
#include <ydb/core/tx/datashard/datashard.h>
4+
#include <ydb/core/tx/datashard/datashard_impl.h>
5+
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
6+
7+
namespace NKikimr::NColumnShard::NBackup {
8+
9+
TConclusion<std::unique_ptr<NActors::IActor>> CreateAsyncJobImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo) {
10+
const auto settingsKind = restoreTask.GetSettingsCase();
11+
switch (settingsKind) {
12+
case NKikimrSchemeOp::TRestoreTask::kS3Settings:
13+
#ifndef KIKIMR_DISABLE_S3_OPS
14+
return std::unique_ptr<NActors::IActor>(CreateS3Downloader(subscriberActorId, txId, restoreTask, tableInfo));
15+
#else
16+
return TConclusionStatus::Fail("Exports to S3 are disabled");
17+
#endif
18+
default:
19+
return TConclusionStatus::Fail(TStringBuilder() << "Unknown settings: " << static_cast<ui32>(settingsKind));
20+
}
21+
}
22+
23+
class TImportDownloader: public TActorBootstrapped<TImportDownloader> {
24+
public:
25+
TImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo)
26+
: SubscriberActorId(subscriberActorId)
27+
, TxId(txId)
28+
, RestoreTask(restoreTask)
29+
, TableInfo(tableInfo) {
30+
}
31+
32+
void Bootstrap() {
33+
Register(CreateAsyncJobImportDownloader(SelfId(), TxId, RestoreTask, TableInfo).DetachResult().release());
34+
Become(&TThis::StateMain);
35+
}
36+
37+
STRICT_STFUNC(
38+
StateMain,
39+
hFunc(NKikimr::TEvDataShard::TEvGetS3DownloadInfo, Handle)
40+
hFunc(NKikimr::TEvDataShard::TEvStoreS3DownloadInfo, Handle)
41+
hFunc(NKikimr::TEvDataShard::TEvS3UploadRowsRequest, Handle)
42+
hFunc(NDataShard::TDataShard::TEvPrivate::TEvAsyncJobComplete, Handle)
43+
)
44+
45+
void Handle(NKikimr::TEvDataShard::TEvGetS3DownloadInfo::TPtr& ev) {
46+
Cerr << "TEvGetS3DownloadInfo: " << ev->Get()->ToString() << Endl;
47+
Send(ev->Sender, new NKikimr::TEvDataShard::TEvS3DownloadInfo());
48+
Y_UNUSED(ev);
49+
50+
}
51+
52+
void Handle(NKikimr::TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev) {
53+
Send(ev->Sender, new NKikimr::TEvDataShard::TEvS3DownloadInfo(ev->Get()->Info));
54+
Y_UNUSED(ev);
55+
}
56+
57+
TVector<std::pair<TString, NScheme::TTypeInfo>> MakeYdbSchema() {
58+
return {{"key", NScheme::TTypeInfo(NScheme::NTypeIds::String)}, {"value", NScheme::TTypeInfo(NScheme::NTypeIds::String)}};
59+
}
60+
61+
void Handle(NKikimr::TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev) {
62+
TSerializedCellVec keyCells;
63+
TSerializedCellVec valueCells;
64+
65+
NArrow::TArrowBatchBuilder batchBuilder;
66+
const auto startStatus = batchBuilder.Start(MakeYdbSchema());
67+
if (!startStatus.ok()) {
68+
/* TODO: error handling */
69+
}
70+
71+
72+
for (const auto& r : ev->Get()->Record.GetRows()) {
73+
// TODO: use safe parsing!
74+
keyCells.Parse(r.GetKeyColumns());
75+
valueCells.Parse(r.GetValueColumns());
76+
batchBuilder.AddRow(keyCells.GetCells(), valueCells.GetCells());
77+
}
78+
79+
auto resultBatch = batchBuilder.FlushBatch(false);
80+
81+
auto response = new NKikimr::TEvDataShard::TEvS3UploadRowsResponse();
82+
response->Info = ev->Get()->Info;
83+
Send(ev->Sender, response);
84+
Y_UNUSED(ev);
85+
}
86+
87+
void Handle(NDataShard::TDataShard::TEvPrivate::TEvAsyncJobComplete::TPtr& ev) {
88+
Y_UNUSED(ev);
89+
}
90+
91+
private:
92+
NActors::TActorId SubscriberActorId;
93+
ui64 TxId;
94+
NKikimrSchemeOp::TRestoreTask RestoreTask;
95+
NKikimr::NDataShard::TTableInfo TableInfo;
96+
};
97+
98+
99+
std::unique_ptr<NActors::IActor> CreateImportDownloaderImport(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo) {
100+
return std::make_unique<TImportDownloader>(subscriberActorId, txId, restoreTask, tableInfo);
101+
}
102+
103+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include <ydb/library/actors/core/actor_bootstrapped.h>
2+
#include <ydb/core/tx/datashard/import_common.h>
3+
#include <ydb/library/conclusion/result.h>
4+
#include <ydb/core/tx/datashard/import_s3.h>
5+
6+
namespace NKikimr::NColumnShard::NBackup {
7+
8+
TConclusion<std::unique_ptr<NActors::IActor>> CreateAsyncJobImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo);
9+
10+
std::unique_ptr<NActors::IActor> CreateImportDownloaderImport(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo);
11+
12+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#include <library/cpp/testing/hook/hook.h>
2+
#include <library/cpp/testing/unittest/registar.h>
3+
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h>
5+
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
6+
7+
#include <ydb/library/testlib/s3_recipe_helper/s3_recipe_helper.h>
8+
9+
#include <ydb/apps/ydbd/export/export.h>
10+
#include <ydb/core/protos/flat_scheme_op.pb.h>
11+
#include <ydb/core/protos/s3_settings.pb.h>
12+
#include <ydb/core/testlib/basics/runtime.h>
13+
#include <ydb/core/testlib/tablet_helpers.h>
14+
#include <ydb/core/tx/columnshard/backup/async_jobs/import_downloader.h>
15+
#include <ydb/core/tx/columnshard/backup/iscan/iscan.h>
16+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
17+
18+
19+
namespace NKikimr {
20+
21+
namespace {
22+
23+
using TRuntimePtr = std::shared_ptr<TTestActorRuntime>;
24+
25+
std::shared_ptr<arrow::RecordBatch> TestRecordBatch() {
26+
std::vector<std::string> keys = {"foo", "bar", "baz"};
27+
std::vector<std::string> values = {"one", "two", "three"};
28+
29+
arrow::StringBuilder key_builder;
30+
for (const auto& k : keys) {
31+
Y_UNUSED(key_builder.Append(k));
32+
}
33+
std::shared_ptr<arrow::Array> key_array;
34+
Y_UNUSED(key_builder.Finish(&key_array));
35+
36+
arrow::StringBuilder value_builder;
37+
for (const auto& v : values) {
38+
Y_UNUSED(value_builder.Append(v));
39+
}
40+
std::shared_ptr<arrow::Array> value_array;
41+
Y_UNUSED(value_builder.Finish(&value_array));
42+
43+
auto schema = arrow::schema({
44+
arrow::field("key", arrow::binary()),
45+
arrow::field("value", arrow::binary())
46+
});
47+
48+
return arrow::RecordBatch::Make(schema, keys.size(), {key_array, value_array});
49+
}
50+
51+
NDataShard::IExport::TTableColumns MakeYdbColumns() {
52+
NDataShard::IExport::TTableColumns columns;
53+
columns[0] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "key", true);
54+
columns[1] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "value", false);
55+
return columns;
56+
}
57+
58+
NKikimrSchemeOp::TBackupTask MakeBackupTask(const TString& bucketName) {
59+
NKikimrSchemeOp::TBackupTask backupTask;
60+
backupTask.SetEnablePermissions(true);
61+
auto& s3Settings = *backupTask.MutableS3Settings();
62+
s3Settings.SetBucket(bucketName);
63+
s3Settings.SetEndpoint(GetEnv("S3_ENDPOINT"));
64+
auto& table = *backupTask.MutableTable();
65+
auto& tableDescription = *table.MutableColumnTableDescription();
66+
tableDescription.SetColumnShardCount(4);
67+
auto& col1 = *tableDescription.MutableSchema()->MutableColumns()->Add();
68+
col1.SetName("key");
69+
col1.SetType("String");
70+
71+
auto& col2 = *tableDescription.MutableSchema()->MutableColumns()->Add();
72+
col2.SetName("value");
73+
col2.SetType("String");
74+
table.MutableSelf();
75+
return backupTask;
76+
}
77+
78+
NKikimrSchemeOp::TRestoreTask MakeRestoreTask(const TString& bucketName) {
79+
NKikimrSchemeOp::TRestoreTask restoreTask;
80+
auto& s3Settings = *restoreTask.MutableS3Settings();
81+
s3Settings.SetBucket(bucketName);
82+
s3Settings.SetEndpoint(GetEnv("S3_ENDPOINT"));
83+
auto& description = *restoreTask.MutableTableDescription();
84+
auto& col1 = *description.AddColumns();
85+
col1.SetName("key");
86+
col1.SetType("String");
87+
col1.SetId(1);
88+
col1.SetTypeId(NScheme::NTypeIds::String);
89+
auto& col2 = *description.AddColumns();
90+
col2.SetName("value");
91+
col2.SetType("String");
92+
col2.SetId(2);
93+
col2.SetTypeId(NScheme::NTypeIds::String);
94+
description.AddKeyColumnNames("key");
95+
description.AddKeyColumnIds(1);
96+
return restoreTask;
97+
}
98+
99+
}
100+
101+
using namespace NColumnShard;
102+
103+
Y_UNIT_TEST_SUITE(IScan) {
104+
105+
Y_UNIT_TEST(MultiExport) {
106+
Aws::S3::S3Client s3Client = NTestUtils::MakeS3Client();
107+
NTestUtils::CreateBucket("test2", s3Client);
108+
109+
TRuntimePtr runtime(new TTestBasicRuntime());
110+
runtime->SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG);
111+
runtime->SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_DEBUG);
112+
SetupTabletServices(*runtime);
113+
114+
const auto edge = runtime->AllocateEdgeActor(0);
115+
auto exportFactory = std::make_shared<TDataShardExportFactory>();
116+
auto actor = NKikimr::NColumnShard::NBackup::CreateExportUploaderActor(edge, MakeBackupTask("test2"), exportFactory.get(), MakeYdbColumns(), 0);
117+
auto exporter = runtime->Register(actor.release());
118+
119+
TAutoPtr<IEventHandle> handle;
120+
runtime->DispatchEvents({}, TDuration::Seconds(1));
121+
runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), false)));
122+
runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), true)));
123+
auto event1 = runtime->GrabEdgeEvent<NColumnShard::TEvPrivate::TEvBackupExportRecordBatchResult>(handle);
124+
UNIT_ASSERT(!event1->IsFinish);
125+
auto event2 = runtime->GrabEdgeEvent<NColumnShard::TEvPrivate::TEvBackupExportRecordBatchResult>(handle);
126+
UNIT_ASSERT(event2->IsFinish);
127+
128+
runtime->DispatchEvents({}, TDuration::Seconds(5));
129+
std::vector<TString> result = NTestUtils::GetObjectKeys("test2", s3Client);
130+
UNIT_ASSERT_VALUES_EQUAL(NTestUtils::GetUncommittedUploadsCount("test2", s3Client), 0);
131+
UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", result), "data_00.csv,metadata.json,permissions.pb,scheme.pb");
132+
auto scheme = NTestUtils::GetObject("test2", "scheme.pb", s3Client);
133+
UNIT_ASSERT_VALUES_EQUAL(scheme, "columns {\n name: \"key\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\ncolumns {\n name: \"value\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\npartitioning_settings {\n min_partitions_count: 4\n}\nstore_type: STORE_TYPE_COLUMN\n");
134+
auto metadata = NTestUtils::GetObject("test2", "metadata.json", s3Client);
135+
UNIT_ASSERT_VALUES_EQUAL(metadata, "{\"version\":0,\"full_backups\":[{\"snapshot_vts\":[0,0]}],\"permissions\":1,\"changefeeds\":[]}");
136+
auto data = NTestUtils::GetObject("test2", "data_00.csv", s3Client);
137+
UNIT_ASSERT_VALUES_EQUAL(data, "\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n");
138+
139+
140+
auto restoreTask = MakeRestoreTask("test2");
141+
auto userTable = MakeIntrusiveConst<NDataShard::TUserTable>(ui32(0), restoreTask.GetTableDescription(), ui32(0));
142+
143+
auto importActor = NKikimr::NColumnShard::NBackup::CreateImportDownloaderImport(edge, 0, restoreTask, NKikimr::NDataShard::TTableInfo{0, userTable});
144+
runtime->Register(importActor.release());
145+
runtime->DispatchEvents({}, TDuration::Seconds(1));
146+
}
147+
}
148+
149+
} // namespace NKikimr
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
UNITTEST_FOR(ydb/core/tx/columnshard/backup/async_jobs)
2+
3+
PEERDIR(
4+
library/cpp/getopt
5+
library/cpp/regex/pcre
6+
library/cpp/svnversion
7+
ydb/apps/ydbd/export
8+
ydb/core/testlib/default
9+
ydb/core/tx
10+
ydb/core/tx/columnshard/hooks/abstract
11+
ydb/core/tx/columnshard/hooks/testing
12+
ydb/core/tx/columnshard/test_helper
13+
ydb/library/aclib/protos
14+
ydb/public/lib/yson_value
15+
ydb/services/metadata
16+
ydb/library/testlib/s3_recipe_helper
17+
)
18+
19+
YQL_LAST_ABI_VERSION()
20+
21+
INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc)
22+
23+
SRCS(
24+
ut_import_downloader.cpp
25+
)
26+
27+
END()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
import_downloader.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/formats/arrow
9+
ydb/library/actors/core
10+
)
11+
12+
YQL_LAST_ABI_VERSION()
13+
14+
15+
END()

0 commit comments

Comments
 (0)