Skip to content

Commit 9d90220

Browse files
LiuRuoyu01chuandew
authored andcommitted
[fix][sdk]Fixup distinguish different coordinator service
1 parent d7b839f commit 9d90220

21 files changed

+125
-108
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def build_extension(self, ext: CMakeExtension) -> None:
139139
"""
140140
setup(
141141
name="dingosdk",
142-
version="0.3rc1.post7",
142+
version="0.3rc1.post8",
143143
author="DingoDB",
144144
author_email="[email protected]",
145145
description="dingo-store python sdk",

src/sdk/admin_tool.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Status AdminTool::CreateTableIds(int64_t count, std::vector<int64_t>& out_table_
6464

6565
rpc.MutableRequest()->set_count(count);
6666

67-
Status ret = stub_.GetMetaRpcController()->SyncCall(rpc);
67+
Status ret = stub_.GetCoordinatorRpcController()->SyncCall(rpc);
6868
if (!ret.ok()) {
6969
return ret;
7070
}
@@ -89,7 +89,7 @@ Status AdminTool::DropIndex(int64_t index_id) {
8989
index_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA);
9090
index_pb->set_entity_id(index_id);
9191

92-
Status s = stub_.GetMetaRpcController()->SyncCall(rpc);
92+
Status s = stub_.GetCoordinatorRpcController()->SyncCall(rpc);
9393
if (s.IsNotFound()) {
9494
s = Status::OK();
9595
}

src/sdk/auto_increment_manager.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Status AutoIncrementer::UpdateAutoIncrementId(int64_t start_id) {
8282
return RunOperation([this, start_id]() {
8383
UpdateAutoIncrementRpc rpc;
8484
PrepareUpdateAutoIncrementRequest(*rpc.MutableRequest(), start_id);
85-
DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
85+
DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc));
8686
VLOG(kSdkVlogLevel) << "UpdateAutoIncrement request:" << rpc.Request()->ShortDebugString()
8787
<< " response:" << rpc.Response()->ShortDebugString();
8888
id_cache_.clear();
@@ -115,7 +115,7 @@ Status AutoIncrementer::RefillCache() {
115115
GenerateAutoIncrementRpc rpc;
116116
PrepareGenerateAutoIncrementRequest(*rpc.MutableRequest());
117117

118-
DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
118+
DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc));
119119
VLOG(kSdkVlogLevel) << "GenerateAutoIncrement request:" << rpc.Request()->ShortDebugString()
120120
<< " response:" << rpc.Response()->ShortDebugString();
121121
// TODO: maybe not crash just return error msg

src/sdk/client_stub.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ Status ClientStub::Open(const std::vector<EndPoint>& endpoints) {
4545
coordinator_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
4646
coordinator_rpc_controller_->Open(endpoints);
4747

48-
meta_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
49-
meta_rpc_controller_->Open(endpoints);
48+
tso_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
49+
tso_rpc_controller_->Open(endpoints);
50+
51+
auto_incrementer_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
52+
auto_incrementer_rpc_controller_->Open(endpoints);
5053

5154
version_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
5255
version_rpc_controller_->Open(endpoints);

src/sdk/client_stub.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ class ClientStub {
5050
return coordinator_rpc_controller_;
5151
}
5252

53-
virtual std::shared_ptr<CoordinatorRpcController> GetMetaRpcController() const {
54-
DCHECK_NOTNULL(meta_rpc_controller_.get());
55-
return meta_rpc_controller_;
53+
virtual std::shared_ptr<CoordinatorRpcController> GetTsoRpcController() const {
54+
DCHECK_NOTNULL(tso_rpc_controller_.get());
55+
return tso_rpc_controller_;
56+
}
57+
58+
virtual std::shared_ptr<CoordinatorRpcController> GetAutoIncrementerRpcController() const {
59+
DCHECK_NOTNULL(auto_incrementer_rpc_controller_.get());
60+
return auto_incrementer_rpc_controller_;
5661
}
5762

5863
virtual std::shared_ptr<CoordinatorRpcController> GetVersionRpcController() const {
@@ -128,7 +133,8 @@ class ClientStub {
128133
private:
129134
// TODO: use unique ptr
130135
std::shared_ptr<CoordinatorRpcController> coordinator_rpc_controller_;
131-
std::shared_ptr<CoordinatorRpcController> meta_rpc_controller_;
136+
std::shared_ptr<CoordinatorRpcController> tso_rpc_controller_;
137+
std::shared_ptr<CoordinatorRpcController> auto_incrementer_rpc_controller_;
132138
std::shared_ptr<CoordinatorRpcController> version_rpc_controller_;
133139
std::shared_ptr<MetaCache> meta_cache_;
134140
std::shared_ptr<RpcClient> rpc_client_;

src/sdk/coordinator.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Status Coordinator::CreateAutoIncrement(int64_t table_id, int64_t start_id) {
5656

5757
mut_request->set_start_id(start_id);
5858

59-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
59+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
6060
if (!status.IsOK()) {
6161
DINGO_LOG(ERROR) << fmt::format("Create auto increment fail, error: {} {}", status.Errno(), status.ToString());
6262
return status;
@@ -73,7 +73,7 @@ Status Coordinator::DeleteAutoIncrement(int64_t table_id) {
7373
mut_table_id->set_entity_id(table_id);
7474
mut_table_id->set_parent_entity_id(0);
7575

76-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
76+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
7777
if (!status.IsOK()) {
7878
DINGO_LOG(ERROR) << fmt::format("Delete auto increment fail, error: {} {}", status.Errno(), status.ToString());
7979
return status;
@@ -93,7 +93,7 @@ Status Coordinator::UpdateAutoIncrement(int64_t table_id, int64_t start_id) {
9393
mut_request->set_start_id(start_id);
9494
mut_request->set_force(false);
9595

96-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
96+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
9797
if (!status.IsOK()) {
9898
DINGO_LOG(ERROR) << fmt::format("Update auto increment fail, error: {} {}", status.Errno(), status.ToString());
9999
return status;
@@ -114,7 +114,7 @@ Status Coordinator::GenerateAutoIncrement(int64_t table_id, int64_t count, int64
114114
mut_request->set_auto_increment_increment(1);
115115
mut_request->set_auto_increment_offset(1);
116116

117-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
117+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
118118
if (!status.IsOK()) {
119119
DINGO_LOG(ERROR) << fmt::format("Generate auto increment fail, error: {} {}", status.Errno(), status.ToString());
120120
return status;
@@ -134,7 +134,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) {
134134
mut_table_id->set_entity_id(table_id);
135135
mut_table_id->set_parent_entity_id(0);
136136

137-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
137+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
138138
if (!status.IsOK()) {
139139
if (status.Errno() == pb::error::EAUTO_INCREMENT_NOT_FOUND) {
140140
return Status::NotFound("auto increment not found");
@@ -150,7 +150,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) {
150150

151151
Status Coordinator::GetAutoIncrements(std::vector<TableIncrement>& table_increments) {
152152
GetAutoIncrementsRpc rpc;
153-
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
153+
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
154154
if (!status.IsOK()) {
155155
DINGO_LOG(ERROR) << fmt::format("Get all auto increment fail, error: {} {}", status.Errno(), status.ToString());
156156
return status;

src/sdk/document/document_index_cache.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexByKey(const DocumentIndexCacheKey
118118
schema->set_entity_id(schema_id);
119119
rpc.MutableRequest()->set_index_name(index_name);
120120

121-
DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
121+
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));
122122

123123
if (CheckIndexResponse(*rpc.Response())) {
124124
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index);
@@ -134,7 +134,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexById(int64_t index_id, std::share
134134
index_id_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA);
135135
index_id_pb->set_entity_id(index_id);
136136

137-
DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
137+
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));
138138

139139
if (CheckIndexResponse(*rpc.Response())) {
140140
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index);

src/sdk/document/document_index_creator.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ Status DocumentIndexCreator::Create(int64_t& out_index_id) {
120120

121121
out_index_id = new_index_id;
122122

123-
return data_->stub.GetMetaRpcController()->SyncCall(rpc);
123+
return data_->stub.GetCoordinatorRpcController()->SyncCall(rpc);
124124
}
125125

126126
} // namespace sdk

src/sdk/meta_member_info.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
// limitations under the License.
1414

1515
#include "sdk/meta_member_info.h"
16+
#include <fmt/format.h>
1617

1718
#include <algorithm>
1819
#include <mutex>
1920

21+
#include "common/logging.h"
2022
#include "glog/logging.h"
2123
#include "sdk/utils/net_util.h"
2224

@@ -33,6 +35,7 @@ EndPoint MetaMemberInfo::PickNextLeader() {
3335
leader = leader_;
3436
} else {
3537
leader = members_[next_ % members_.size()];
38+
DINGO_LOG(INFO) << fmt::format("[sdk.meta]Pick next leader: {}", leader.ToString());
3639
next_++;
3740
}
3841
}

src/sdk/rpc/coordinator_rpc_controller.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ bool CoordinatorRpcController::NeedPickLeader(Rpc& rpc) {
6868
void CoordinatorRpcController::PrepareRpc(Rpc& rpc) {
6969
if (NeedPickLeader(rpc)) {
7070
EndPoint next_leader = meta_member_info_.PickNextLeader();
71-
DINGO_LOG(INFO) << fmt::format("[sdk.rpc.{}]Pick next leader: {}, rpc method: {}", rpc.LogId(),
72-
next_leader.ToString(), rpc.Method());
7371

7472
CHECK(next_leader.IsValid());
7573
rpc.SetEndPoint(next_leader);

0 commit comments

Comments
 (0)