diff --git a/setup.py b/setup.py index cefc68e..ff551d5 100644 --- a/setup.py +++ b/setup.py @@ -139,7 +139,7 @@ def build_extension(self, ext: CMakeExtension) -> None: """ setup( name="dingosdk", - version="0.3rc1.post7", + version="0.3rc1.post8", author="DingoDB", author_email="dingodb@zetyun.com", description="dingo-store python sdk", diff --git a/src/sdk/admin_tool.cc b/src/sdk/admin_tool.cc index e7591b1..c1afc2c 100644 --- a/src/sdk/admin_tool.cc +++ b/src/sdk/admin_tool.cc @@ -64,7 +64,7 @@ Status AdminTool::CreateTableIds(int64_t count, std::vector& out_table_ rpc.MutableRequest()->set_count(count); - Status ret = stub_.GetMetaRpcController()->SyncCall(rpc); + Status ret = stub_.GetCoordinatorRpcController()->SyncCall(rpc); if (!ret.ok()) { return ret; } @@ -89,7 +89,7 @@ Status AdminTool::DropIndex(int64_t index_id) { index_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA); index_pb->set_entity_id(index_id); - Status s = stub_.GetMetaRpcController()->SyncCall(rpc); + Status s = stub_.GetCoordinatorRpcController()->SyncCall(rpc); if (s.IsNotFound()) { s = Status::OK(); } diff --git a/src/sdk/auto_increment_manager.cc b/src/sdk/auto_increment_manager.cc index cc6c3c1..6369a5b 100644 --- a/src/sdk/auto_increment_manager.cc +++ b/src/sdk/auto_increment_manager.cc @@ -82,7 +82,7 @@ Status AutoIncrementer::UpdateAutoIncrementId(int64_t start_id) { return RunOperation([this, start_id]() { UpdateAutoIncrementRpc rpc; PrepareUpdateAutoIncrementRequest(*rpc.MutableRequest(), start_id); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc)); VLOG(kSdkVlogLevel) << "UpdateAutoIncrement request:" << rpc.Request()->ShortDebugString() << " response:" << rpc.Response()->ShortDebugString(); id_cache_.clear(); @@ -115,7 +115,7 @@ Status AutoIncrementer::RefillCache() { GenerateAutoIncrementRpc rpc; PrepareGenerateAutoIncrementRequest(*rpc.MutableRequest()); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc)); VLOG(kSdkVlogLevel) << "GenerateAutoIncrement request:" << rpc.Request()->ShortDebugString() << " response:" << rpc.Response()->ShortDebugString(); // TODO: maybe not crash just return error msg diff --git a/src/sdk/client_stub.cc b/src/sdk/client_stub.cc index 5389983..60e067e 100644 --- a/src/sdk/client_stub.cc +++ b/src/sdk/client_stub.cc @@ -45,8 +45,11 @@ Status ClientStub::Open(const std::vector& endpoints) { coordinator_rpc_controller_ = std::make_shared(*this); coordinator_rpc_controller_->Open(endpoints); - meta_rpc_controller_ = std::make_shared(*this); - meta_rpc_controller_->Open(endpoints); + tso_rpc_controller_ = std::make_shared(*this); + tso_rpc_controller_->Open(endpoints); + + auto_incrementer_rpc_controller_ = std::make_shared(*this); + auto_incrementer_rpc_controller_->Open(endpoints); version_rpc_controller_ = std::make_shared(*this); version_rpc_controller_->Open(endpoints); diff --git a/src/sdk/client_stub.h b/src/sdk/client_stub.h index 2e37486..addfa8b 100644 --- a/src/sdk/client_stub.h +++ b/src/sdk/client_stub.h @@ -50,9 +50,14 @@ class ClientStub { return coordinator_rpc_controller_; } - virtual std::shared_ptr GetMetaRpcController() const { - DCHECK_NOTNULL(meta_rpc_controller_.get()); - return meta_rpc_controller_; + virtual std::shared_ptr GetTsoRpcController() const { + DCHECK_NOTNULL(tso_rpc_controller_.get()); + return tso_rpc_controller_; + } + + virtual std::shared_ptr GetAutoIncrementerRpcController() const { + DCHECK_NOTNULL(auto_incrementer_rpc_controller_.get()); + return auto_incrementer_rpc_controller_; } virtual std::shared_ptr GetVersionRpcController() const { @@ -128,7 +133,8 @@ class ClientStub { private: // TODO: use unique ptr std::shared_ptr coordinator_rpc_controller_; - std::shared_ptr meta_rpc_controller_; + std::shared_ptr tso_rpc_controller_; + std::shared_ptr auto_incrementer_rpc_controller_; std::shared_ptr version_rpc_controller_; std::shared_ptr meta_cache_; std::shared_ptr rpc_client_; diff --git a/src/sdk/coordinator.cc b/src/sdk/coordinator.cc index 92e6758..d46451a 100644 --- a/src/sdk/coordinator.cc +++ b/src/sdk/coordinator.cc @@ -56,7 +56,7 @@ Status Coordinator::CreateAutoIncrement(int64_t table_id, int64_t start_id) { mut_request->set_start_id(start_id); - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("Create auto increment fail, error: {} {}", status.Errno(), status.ToString()); return status; @@ -73,7 +73,7 @@ Status Coordinator::DeleteAutoIncrement(int64_t table_id) { mut_table_id->set_entity_id(table_id); mut_table_id->set_parent_entity_id(0); - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("Delete auto increment fail, error: {} {}", status.Errno(), status.ToString()); return status; @@ -93,7 +93,7 @@ Status Coordinator::UpdateAutoIncrement(int64_t table_id, int64_t start_id) { mut_request->set_start_id(start_id); mut_request->set_force(false); - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("Update auto increment fail, error: {} {}", status.Errno(), status.ToString()); return status; @@ -114,7 +114,7 @@ Status Coordinator::GenerateAutoIncrement(int64_t table_id, int64_t count, int64 mut_request->set_auto_increment_increment(1); mut_request->set_auto_increment_offset(1); - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("Generate auto increment fail, error: {} {}", status.Errno(), status.ToString()); return status; @@ -134,7 +134,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) { mut_table_id->set_entity_id(table_id); mut_table_id->set_parent_entity_id(0); - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { if (status.Errno() == pb::error::EAUTO_INCREMENT_NOT_FOUND) { return Status::NotFound("auto increment not found"); @@ -150,7 +150,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) { Status Coordinator::GetAutoIncrements(std::vector& table_increments) { GetAutoIncrementsRpc rpc; - Status status = stub_.GetMetaRpcController()->SyncCall(rpc); + Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("Get all auto increment fail, error: {} {}", status.Errno(), status.ToString()); return status; diff --git a/src/sdk/document/document_index_cache.cc b/src/sdk/document/document_index_cache.cc index c82a3ed..e2a8a84 100644 --- a/src/sdk/document/document_index_cache.cc +++ b/src/sdk/document/document_index_cache.cc @@ -118,7 +118,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexByKey(const DocumentIndexCacheKey schema->set_entity_id(schema_id); rpc.MutableRequest()->set_index_name(index_name); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc)); if (CheckIndexResponse(*rpc.Response())) { return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index); @@ -134,7 +134,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexById(int64_t index_id, std::share index_id_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA); index_id_pb->set_entity_id(index_id); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc)); if (CheckIndexResponse(*rpc.Response())) { return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index); diff --git a/src/sdk/document/document_index_creator.cc b/src/sdk/document/document_index_creator.cc index 012ca19..39ab9dd 100644 --- a/src/sdk/document/document_index_creator.cc +++ b/src/sdk/document/document_index_creator.cc @@ -120,7 +120,7 @@ Status DocumentIndexCreator::Create(int64_t& out_index_id) { out_index_id = new_index_id; - return data_->stub.GetMetaRpcController()->SyncCall(rpc); + return data_->stub.GetCoordinatorRpcController()->SyncCall(rpc); } } // namespace sdk diff --git a/src/sdk/meta_member_info.cc b/src/sdk/meta_member_info.cc index 9a87cdd..fec46e9 100644 --- a/src/sdk/meta_member_info.cc +++ b/src/sdk/meta_member_info.cc @@ -13,10 +13,12 @@ // limitations under the License. #include "sdk/meta_member_info.h" +#include #include #include +#include "common/logging.h" #include "glog/logging.h" #include "sdk/utils/net_util.h" @@ -33,6 +35,7 @@ EndPoint MetaMemberInfo::PickNextLeader() { leader = leader_; } else { leader = members_[next_ % members_.size()]; + DINGO_LOG(INFO) << fmt::format("[sdk.meta]Pick next leader: {}", leader.ToString()); next_++; } } diff --git a/src/sdk/rpc/coordinator_rpc_controller.cc b/src/sdk/rpc/coordinator_rpc_controller.cc index e06823e..f9c1c5c 100644 --- a/src/sdk/rpc/coordinator_rpc_controller.cc +++ b/src/sdk/rpc/coordinator_rpc_controller.cc @@ -68,8 +68,6 @@ bool CoordinatorRpcController::NeedPickLeader(Rpc& rpc) { void CoordinatorRpcController::PrepareRpc(Rpc& rpc) { if (NeedPickLeader(rpc)) { EndPoint next_leader = meta_member_info_.PickNextLeader(); - DINGO_LOG(INFO) << fmt::format("[sdk.rpc.{}]Pick next leader: {}, rpc method: {}", rpc.LogId(), - next_leader.ToString(), rpc.Method()); CHECK(next_leader.IsValid()); rpc.SetEndPoint(next_leader); diff --git a/src/sdk/transaction/tso.cc b/src/sdk/transaction/tso.cc index fe62a4d..3f22aeb 100644 --- a/src/sdk/transaction/tso.cc +++ b/src/sdk/transaction/tso.cc @@ -124,7 +124,7 @@ Status TsoProvider::FetchTso(uint32_t count) { rpc.MutableRequest()->set_op_type(pb::meta::TsoOpType::OP_GEN_TSO); rpc.MutableRequest()->set_count(count); - auto status = stub_.GetMetaRpcController()->SyncCall(rpc); + auto status = stub_.GetTsoRpcController()->SyncCall(rpc); if (!status.IsOK()) { DINGO_LOG(ERROR) << fmt::format("[sdk.tso] fetch tso fail, status({}).", status.ToString()); return status; diff --git a/src/sdk/vector/vector_index_cache.cc b/src/sdk/vector/vector_index_cache.cc index dd5cb56..264dabe 100644 --- a/src/sdk/vector/vector_index_cache.cc +++ b/src/sdk/vector/vector_index_cache.cc @@ -118,7 +118,7 @@ Status VectorIndexCache::SlowGetVectorIndexByKey(const VectorIndexCacheKey& inde schema->set_entity_id(schema_id); rpc.MutableRequest()->set_index_name(index_name); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc)); if (CheckIndexResponse(*rpc.Response())) { return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_vector_index); @@ -134,7 +134,7 @@ Status VectorIndexCache::SlowGetVectorIndexById(int64_t index_id, std::shared_pt index_id_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA); index_id_pb->set_entity_id(index_id); - DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc)); + DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc)); if (CheckIndexResponse(*rpc.Response())) { return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_vector_index); diff --git a/src/sdk/vector/vector_index_creator.cc b/src/sdk/vector/vector_index_creator.cc index 5dc1e64..ac6fcb7 100644 --- a/src/sdk/vector/vector_index_creator.cc +++ b/src/sdk/vector/vector_index_creator.cc @@ -205,7 +205,7 @@ Status VectorIndexCreator::Create(int64_t& out_index_id) { out_index_id = new_index_id; - return data_->stub.GetMetaRpcController()->SyncCall(rpc); + return data_->stub.GetCoordinatorRpcController()->SyncCall(rpc); } } // namespace sdk diff --git a/test/unit_test/sdk/mock_client_stub.h b/test/unit_test/sdk/mock_client_stub.h index ae0016a..ff7869b 100644 --- a/test/unit_test/sdk/mock_client_stub.h +++ b/test/unit_test/sdk/mock_client_stub.h @@ -28,7 +28,8 @@ class MockClientStub final : public ClientStub { ~MockClientStub() override = default; MOCK_METHOD(std::shared_ptr, GetCoordinatorRpcController, (), (const, override)); - MOCK_METHOD(std::shared_ptr, GetMetaRpcController, (), (const, override)); + MOCK_METHOD(std::shared_ptr, GetTsoRpcController, (), (const, override)); + MOCK_METHOD(std::shared_ptr, GetAutoIncrementerRpcController, (), (const, override)); MOCK_METHOD(std::shared_ptr, GetMetaCache, (), (const, override)); MOCK_METHOD(std::shared_ptr, GetRpcClient, (), (const, override)); MOCK_METHOD(std::shared_ptr, GetRawKvRegionScannerFactory, (), (const, override)); diff --git a/test/unit_test/sdk/test_auto_increment_manager.cc b/test/unit_test/sdk/test_auto_increment_manager.cc index e634793..2c6fee7 100644 --- a/test/unit_test/sdk/test_auto_increment_manager.cc +++ b/test/unit_test/sdk/test_auto_increment_manager.cc @@ -68,7 +68,7 @@ class SDKAutoInrementerTest : public TestBase { }; TEST_F(SDKAutoInrementerTest, CacheEmpty) { - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1); @@ -87,7 +87,7 @@ TEST_F(SDKAutoInrementerTest, CacheEmpty) { } TEST_F(SDKAutoInrementerTest, FromCache) { - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1); @@ -114,7 +114,7 @@ TEST_F(SDKAutoInrementerTest, FromCache) { } TEST_F(SDKAutoInrementerTest, FromCacheThenRefill) { - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); @@ -159,7 +159,7 @@ TEST_F(SDKAutoInrementerTest, FromCacheThenRefill) { } TEST_F(SDKAutoInrementerTest, MultiThreadGetNextId) { - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); @@ -205,7 +205,7 @@ TEST_F(SDKAutoInrementerTest, MultiThreadGetNextId) { } TEST_F(SDKAutoInrementerTest, CacheGetAutoId) { - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1); @@ -225,7 +225,7 @@ TEST_F(SDKAutoInrementerTest, CacheGetAutoId) { } TEST_F(SDKAutoInrementerTest, CacheUpdate) { - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); @@ -276,7 +276,7 @@ TEST_F(SDKAutoInrementerTest, CacheUpdate) { } TEST_F(SDKAutoInrementerTest, MultiThreadUpdateId) { - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count); diff --git a/test/unit_test/sdk/test_base.h b/test/unit_test/sdk/test_base.h index 1cd06ce..282d568 100644 --- a/test/unit_test/sdk/test_base.h +++ b/test/unit_test/sdk/test_base.h @@ -54,10 +54,15 @@ class TestBase : public ::testing::Test { EXPECT_CALL(*stub, GetCoordinatorRpcController).Times(testing::AnyNumber()); ON_CALL(*coordinator_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK())); - meta_rpc_controller = std::make_shared(*stub); - ON_CALL(*stub, GetMetaRpcController).WillByDefault(testing::Return(meta_rpc_controller)); - EXPECT_CALL(*stub, GetMetaRpcController).Times(testing::AnyNumber()); - ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK())); + tso_rpc_controller = std::make_shared(*stub); + ON_CALL(*stub, GetTsoRpcController).WillByDefault(testing::Return(tso_rpc_controller)); + EXPECT_CALL(*stub, GetTsoRpcController).Times(testing::AnyNumber()); + ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK())); + + auto_incrementer_rpc_controller = std::make_shared(*stub); + ON_CALL(*stub, GetAutoIncrementerRpcController).WillByDefault(testing::Return(auto_incrementer_rpc_controller)); + EXPECT_CALL(*stub, GetAutoIncrementerRpcController).Times(testing::AnyNumber()); + ON_CALL(*auto_incrementer_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK())); meta_cache = std::make_shared(coordinator_rpc_controller); ON_CALL(*stub, GetMetaCache).WillByDefault(testing::Return(meta_cache)); @@ -122,7 +127,7 @@ class TestBase : public ::testing::Test { if (txn_actuator) { txn_actuator->Stop(); } - + delete client; } @@ -144,7 +149,8 @@ class TestBase : public ::testing::Test { } std::shared_ptr coordinator_rpc_controller; - std::shared_ptr meta_rpc_controller; + std::shared_ptr tso_rpc_controller; + std::shared_ptr auto_incrementer_rpc_controller; std::shared_ptr meta_cache; std::shared_ptr rpc_client; std::shared_ptr region_scanner_factory; diff --git a/test/unit_test/sdk/transaction/test_txn_impl.cc b/test/unit_test/sdk/transaction/test_txn_impl.cc index 256903d..5458377 100644 --- a/test/unit_test/sdk/transaction/test_txn_impl.cc +++ b/test/unit_test/sdk/transaction/test_txn_impl.cc @@ -55,7 +55,7 @@ class SDKTxnImplTest : public TestBase { options.kind = kOptimistic; options.isolation = kSnapshotIsolation; - ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { + ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -65,7 +65,7 @@ class SDKTxnImplTest : public TestBase { return Status::OK(); }); - EXPECT_CALL(*meta_rpc_controller, SyncCall).Times(testing::AnyNumber()); + EXPECT_CALL(*tso_rpc_controller, SyncCall).Times(testing::AnyNumber()); ON_CALL(*txn_lock_resolver, ResolveLock).WillByDefault(testing::Return(Status::OK())); } @@ -74,7 +74,7 @@ class SDKTxnImplTest : public TestBase { }; TEST_F(SDKTxnImplTest, BeginFail) { - ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { + ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); return Status::NetworkError("mock error"); diff --git a/test/unit_test/sdk/transaction/test_txn_lock_resolver.cc b/test/unit_test/sdk/transaction/test_txn_lock_resolver.cc index c40e16a..0377b4f 100644 --- a/test/unit_test/sdk/transaction/test_txn_lock_resolver.cc +++ b/test/unit_test/sdk/transaction/test_txn_lock_resolver.cc @@ -59,7 +59,7 @@ TEST_F(SDKTxnLockResolverTest, Locked) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -106,7 +106,7 @@ TEST_F(SDKTxnLockResolverTest, Committed) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -179,7 +179,7 @@ TEST_F(SDKTxnLockResolverTest, CommittedResolvePrimaryKeyFail) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -255,7 +255,7 @@ TEST_F(SDKTxnLockResolverTest, CommittedResolveConflictKeyFail) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -331,7 +331,7 @@ TEST_F(SDKTxnLockResolverTest, Rollbacked) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -404,7 +404,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCasePrimaryLockNotExpired) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -464,7 +464,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseCommittedPrimaryLock) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -536,7 +536,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseNotCommittedKeys) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -693,7 +693,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseCommittedPartOfOrdinaryKeys) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -828,7 +828,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseRollbackedPrimaryLock) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -900,7 +900,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseRollbackedPartOfOrdinaryKeys) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -1035,7 +1035,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCasePartOfOrdinaryKeysNotFound) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -1184,7 +1184,7 @@ TEST_F(SDKTxnLockResolverTest, AsyncCommitCaseTransferSyncCommit) { auto fake_tso = CurrentFakeTso(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -1360,7 +1360,7 @@ TEST_F(SDKTxnLockResolverTest, TxnNotFoundTTLExpired) { CHECK(meta_cache->LookupRegionByKey(fake_lock.primary_lock(), region).IsOK()); CHECK_NOTNULL(region.get()); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { + EXPECT_CALL(*tso_rpc_controller, SyncCall).WillRepeatedly([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); diff --git a/test/unit_test/sdk/transaction/test_txn_manager.cc b/test/unit_test/sdk/transaction/test_txn_manager.cc index fe6c236..7240d79 100644 --- a/test/unit_test/sdk/transaction/test_txn_manager.cc +++ b/test/unit_test/sdk/transaction/test_txn_manager.cc @@ -49,7 +49,7 @@ class SDKTxnManagerTest : public TestBase { options.kind = kOptimistic; options.isolation = kSnapshotIsolation; - ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { + ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO); t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size); @@ -59,7 +59,7 @@ class SDKTxnManagerTest : public TestBase { return Status::OK(); }); - EXPECT_CALL(*meta_rpc_controller, SyncCall).Times(testing::AnyNumber()); + EXPECT_CALL(*tso_rpc_controller, SyncCall).Times(testing::AnyNumber()); ON_CALL(*txn_lock_resolver, ResolveLock).WillByDefault(testing::Return(Status::OK())); } diff --git a/test/unit_test/sdk/vector/test_vector_add_task.cc b/test/unit_test/sdk/vector/test_vector_add_task.cc index aa9c5ab..2867eff 100644 --- a/test/unit_test/sdk/vector/test_vector_add_task.cc +++ b/test/unit_test/sdk/vector/test_vector_add_task.cc @@ -15,9 +15,9 @@ #include #include +#include "dingosdk/vector.h" #include "gtest/gtest.h" #include "sdk/rpc/coordinator_rpc.h" -#include "dingosdk/vector.h" #include "sdk/vector/vector_add_task.h" #include "sdk/vector/vector_common.h" #include "test_base.h" @@ -73,7 +73,7 @@ static std::shared_ptr CreateFakeVectorIndex(int64_t start_id = 0) TEST_F(SDKVectorAddTaskTest, InitNoAutoIncreFail) { auto vector_index = CreateFakeVectorIndex(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); @@ -95,7 +95,7 @@ TEST_F(SDKVectorAddTaskTest, InitNoAutoIncreFail) { TEST_F(SDKVectorAddTaskTest, InitNoAutoIncreSuccess) { auto vector_index = CreateFakeVectorIndex(); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); @@ -117,19 +117,19 @@ TEST_F(SDKVectorAddTaskTest, InitNoAutoIncreSuccess) { TEST_F(SDKVectorAddTaskTest, InitAutoIncreRewriteId) { auto vector_index = CreateFakeVectorIndex(1); - EXPECT_CALL(*meta_rpc_controller, SyncCall) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); - *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); - return Status::OK(); - }) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - t_rpc->MutableResponse()->set_start_id(1); - t_rpc->MutableResponse()->set_end_id(100); - return Status::OK(); - }); + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); + *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); + return Status::OK(); + }); + + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + t_rpc->MutableResponse()->set_start_id(1); + t_rpc->MutableResponse()->set_end_id(100); + return Status::OK(); + }); std::vector ids; for (auto i = 0; i < 10; i++) { @@ -146,19 +146,19 @@ TEST_F(SDKVectorAddTaskTest, InitAutoIncreRewriteId) { TEST_F(SDKVectorAddTaskTest, InitAutoIncreSuccess) { auto vector_index = CreateFakeVectorIndex(1); - EXPECT_CALL(*meta_rpc_controller, SyncCall) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); - *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); - return Status::OK(); - }) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - t_rpc->MutableResponse()->set_start_id(1); - t_rpc->MutableResponse()->set_end_id(100); - return Status::OK(); - }); + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); + *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); + return Status::OK(); + }); + + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + t_rpc->MutableResponse()->set_start_id(1); + t_rpc->MutableResponse()->set_end_id(100); + return Status::OK(); + }); std::vector ids; for (auto i = 0; i < 10; i++) { @@ -175,19 +175,19 @@ TEST_F(SDKVectorAddTaskTest, InitAutoIncreSuccess) { TEST_F(SDKVectorAddTaskTest, InitAutoIncreUseGenerateid) { auto vector_index = CreateFakeVectorIndex(1); - EXPECT_CALL(*meta_rpc_controller, SyncCall) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); - *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); - return Status::OK(); - }) - .WillOnce([&](Rpc& rpc) { - auto* t_rpc = dynamic_cast(&rpc); - t_rpc->MutableResponse()->set_start_id(1); - t_rpc->MutableResponse()->set_end_id(100); - return Status::OK(); - }); + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + EXPECT_EQ(t_rpc->Request()->index_id().entity_id(), vector_index->GetId()); + *(t_rpc->MutableResponse()->mutable_index_definition_with_id()) = vector_index->GetIndexDefWithId(); + return Status::OK(); + }); + + EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + auto* t_rpc = dynamic_cast(&rpc); + t_rpc->MutableResponse()->set_start_id(1); + t_rpc->MutableResponse()->set_end_id(100); + return Status::OK(); + }); int64_t count = 10; std::vector ids; diff --git a/test/unit_test/sdk/vector/test_vector_index_cache.cc b/test/unit_test/sdk/vector/test_vector_index_cache.cc index d6939ca..8b5f6c6 100644 --- a/test/unit_test/sdk/vector/test_vector_index_cache.cc +++ b/test/unit_test/sdk/vector/test_vector_index_cache.cc @@ -53,7 +53,7 @@ class SDKVectorIndexCacheTest : public TestBase { TEST_F(SDKVectorIndexCacheTest, GetIndexIdByNameNotOK) { std::string index_name = "test"; - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_name(), index_name); return Status::RemoteError("mock error"); @@ -69,7 +69,7 @@ TEST_F(SDKVectorIndexCacheTest, GetIndexIdByNameNotOK) { TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByKeyNotOK) { std::string index_name = "test"; - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_name(), index_name); return Status::RemoteError("mock error"); @@ -84,7 +84,7 @@ TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByKeyNotOK) { TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByIdNotOK) { int64_t index_id = 2; - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_id().entity_type(), pb::meta::EntityType::ENTITY_TYPE_INDEX); EXPECT_EQ(t_rpc->Request()->index_id().parent_entity_id(), ::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA); @@ -106,7 +106,7 @@ TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByKeyOK) { std::vector range_seperator_ids = {5, 10, 20}; FlatParam flat_param(1000, dingodb::sdk::MetricType::kL2); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_name(), index_name); FillVectorIndexId(t_rpc->MutableResponse()->mutable_index_definition_with_id()->mutable_index_id(), index_id, @@ -144,7 +144,7 @@ TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByKeyOK) { EXPECT_EQ(index->GetVectorIndexType(), flat_param.Type()); } - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*coordinator_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_name(), index_name); @@ -185,7 +185,7 @@ TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByIdOK) { std::vector range_seperator_ids = {5, 10, 20}; FlatParam flat_param(1000, dingodb::sdk::MetricType::kL2); - EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { + EXPECT_CALL(*coordinator_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_id().entity_type(), pb::meta::EntityType::ENTITY_TYPE_INDEX); EXPECT_EQ(t_rpc->Request()->index_id().parent_entity_id(), ::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA); @@ -226,7 +226,7 @@ TEST_F(SDKVectorIndexCacheTest, GetVectorIndexByIdOK) { EXPECT_EQ(index->GetVectorIndexType(), flat_param.Type()); } - EXPECT_CALL(*meta_rpc_controller, SyncCall) + EXPECT_CALL(*coordinator_rpc_controller, SyncCall) .WillOnce([&](Rpc& rpc) { auto* t_rpc = dynamic_cast(&rpc); EXPECT_EQ(t_rpc->Request()->index_id().entity_type(), pb::meta::EntityType::ENTITY_TYPE_INDEX);