Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def build_extension(self, ext: CMakeExtension) -> None:
"""
setup(
name="dingosdk",
version="0.3rc1.post7",
version="0.3rc1.post8",
author="DingoDB",
author_email="[email protected]",
description="dingo-store python sdk",
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/admin_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status AdminTool::CreateTableIds(int64_t count, std::vector<int64_t>& out_table_

rpc.MutableRequest()->set_count(count);

Status ret = stub_.GetMetaRpcController()->SyncCall(rpc);
Status ret = stub_.GetCoordinatorRpcController()->SyncCall(rpc);
if (!ret.ok()) {
return ret;
}
Expand All @@ -89,7 +89,7 @@ Status AdminTool::DropIndex(int64_t index_id) {
index_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA);
index_pb->set_entity_id(index_id);

Status s = stub_.GetMetaRpcController()->SyncCall(rpc);
Status s = stub_.GetCoordinatorRpcController()->SyncCall(rpc);
if (s.IsNotFound()) {
s = Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/auto_increment_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Status AutoIncrementer::UpdateAutoIncrementId(int64_t start_id) {
return RunOperation([this, start_id]() {
UpdateAutoIncrementRpc rpc;
PrepareUpdateAutoIncrementRequest(*rpc.MutableRequest(), start_id);
DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc));
VLOG(kSdkVlogLevel) << "UpdateAutoIncrement request:" << rpc.Request()->ShortDebugString()
<< " response:" << rpc.Response()->ShortDebugString();
id_cache_.clear();
Expand Down Expand Up @@ -115,7 +115,7 @@ Status AutoIncrementer::RefillCache() {
GenerateAutoIncrementRpc rpc;
PrepareGenerateAutoIncrementRequest(*rpc.MutableRequest());

DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetAutoIncrementerRpcController()->SyncCall(rpc));
VLOG(kSdkVlogLevel) << "GenerateAutoIncrement request:" << rpc.Request()->ShortDebugString()
<< " response:" << rpc.Response()->ShortDebugString();
// TODO: maybe not crash just return error msg
Expand Down
7 changes: 5 additions & 2 deletions src/sdk/client_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ Status ClientStub::Open(const std::vector<EndPoint>& endpoints) {
coordinator_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
coordinator_rpc_controller_->Open(endpoints);

meta_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
meta_rpc_controller_->Open(endpoints);
tso_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
tso_rpc_controller_->Open(endpoints);

auto_incrementer_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
auto_incrementer_rpc_controller_->Open(endpoints);

version_rpc_controller_ = std::make_shared<CoordinatorRpcController>(*this);
version_rpc_controller_->Open(endpoints);
Expand Down
14 changes: 10 additions & 4 deletions src/sdk/client_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ class ClientStub {
return coordinator_rpc_controller_;
}

virtual std::shared_ptr<CoordinatorRpcController> GetMetaRpcController() const {
DCHECK_NOTNULL(meta_rpc_controller_.get());
return meta_rpc_controller_;
virtual std::shared_ptr<CoordinatorRpcController> GetTsoRpcController() const {
DCHECK_NOTNULL(tso_rpc_controller_.get());
return tso_rpc_controller_;
}

virtual std::shared_ptr<CoordinatorRpcController> GetAutoIncrementerRpcController() const {
DCHECK_NOTNULL(auto_incrementer_rpc_controller_.get());
return auto_incrementer_rpc_controller_;
}

virtual std::shared_ptr<CoordinatorRpcController> GetVersionRpcController() const {
Expand Down Expand Up @@ -128,7 +133,8 @@ class ClientStub {
private:
// TODO: use unique ptr
std::shared_ptr<CoordinatorRpcController> coordinator_rpc_controller_;
std::shared_ptr<CoordinatorRpcController> meta_rpc_controller_;
std::shared_ptr<CoordinatorRpcController> tso_rpc_controller_;
std::shared_ptr<CoordinatorRpcController> auto_incrementer_rpc_controller_;
std::shared_ptr<CoordinatorRpcController> version_rpc_controller_;
std::shared_ptr<MetaCache> meta_cache_;
std::shared_ptr<RpcClient> rpc_client_;
Expand Down
12 changes: 6 additions & 6 deletions src/sdk/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status Coordinator::CreateAutoIncrement(int64_t table_id, int64_t start_id) {

mut_request->set_start_id(start_id);

Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("Create auto increment fail, error: {} {}", status.Errno(), status.ToString());
return status;
Expand All @@ -73,7 +73,7 @@ Status Coordinator::DeleteAutoIncrement(int64_t table_id) {
mut_table_id->set_entity_id(table_id);
mut_table_id->set_parent_entity_id(0);

Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("Delete auto increment fail, error: {} {}", status.Errno(), status.ToString());
return status;
Expand All @@ -93,7 +93,7 @@ Status Coordinator::UpdateAutoIncrement(int64_t table_id, int64_t start_id) {
mut_request->set_start_id(start_id);
mut_request->set_force(false);

Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("Update auto increment fail, error: {} {}", status.Errno(), status.ToString());
return status;
Expand All @@ -114,7 +114,7 @@ Status Coordinator::GenerateAutoIncrement(int64_t table_id, int64_t count, int64
mut_request->set_auto_increment_increment(1);
mut_request->set_auto_increment_offset(1);

Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("Generate auto increment fail, error: {} {}", status.Errno(), status.ToString());
return status;
Expand All @@ -134,7 +134,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) {
mut_table_id->set_entity_id(table_id);
mut_table_id->set_parent_entity_id(0);

Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
if (status.Errno() == pb::error::EAUTO_INCREMENT_NOT_FOUND) {
return Status::NotFound("auto increment not found");
Expand All @@ -150,7 +150,7 @@ Status Coordinator::GetAutoIncrement(int64_t table_id, int64_t& start_id) {

Status Coordinator::GetAutoIncrements(std::vector<TableIncrement>& table_increments) {
GetAutoIncrementsRpc rpc;
Status status = stub_.GetMetaRpcController()->SyncCall(rpc);
Status status = stub_.GetAutoIncrementerRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("Get all auto increment fail, error: {} {}", status.Errno(), status.ToString());
return status;
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/document/document_index_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexByKey(const DocumentIndexCacheKey
schema->set_entity_id(schema_id);
rpc.MutableRequest()->set_index_name(index_name);

DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));

if (CheckIndexResponse(*rpc.Response())) {
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index);
Expand All @@ -134,7 +134,7 @@ Status DocumentIndexCache::SlowGetDocumentIndexById(int64_t index_id, std::share
index_id_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA);
index_id_pb->set_entity_id(index_id);

DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));

if (CheckIndexResponse(*rpc.Response())) {
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_doc_index);
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/document/document_index_creator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status DocumentIndexCreator::Create(int64_t& out_index_id) {

out_index_id = new_index_id;

return data_->stub.GetMetaRpcController()->SyncCall(rpc);
return data_->stub.GetCoordinatorRpcController()->SyncCall(rpc);
}

} // namespace sdk
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/meta_member_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

#include "sdk/meta_member_info.h"
#include <fmt/format.h>

#include <algorithm>
#include <mutex>

#include "common/logging.h"
#include "glog/logging.h"
#include "sdk/utils/net_util.h"

Expand All @@ -33,6 +35,7 @@ EndPoint MetaMemberInfo::PickNextLeader() {
leader = leader_;
} else {
leader = members_[next_ % members_.size()];
DINGO_LOG(INFO) << fmt::format("[sdk.meta]Pick next leader: {}", leader.ToString());
next_++;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/sdk/rpc/coordinator_rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ bool CoordinatorRpcController::NeedPickLeader(Rpc& rpc) {
void CoordinatorRpcController::PrepareRpc(Rpc& rpc) {
if (NeedPickLeader(rpc)) {
EndPoint next_leader = meta_member_info_.PickNextLeader();
DINGO_LOG(INFO) << fmt::format("[sdk.rpc.{}]Pick next leader: {}, rpc method: {}", rpc.LogId(),
next_leader.ToString(), rpc.Method());

CHECK(next_leader.IsValid());
rpc.SetEndPoint(next_leader);
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/transaction/tso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Status TsoProvider::FetchTso(uint32_t count) {
rpc.MutableRequest()->set_op_type(pb::meta::TsoOpType::OP_GEN_TSO);
rpc.MutableRequest()->set_count(count);

auto status = stub_.GetMetaRpcController()->SyncCall(rpc);
auto status = stub_.GetTsoRpcController()->SyncCall(rpc);
if (!status.IsOK()) {
DINGO_LOG(ERROR) << fmt::format("[sdk.tso] fetch tso fail, status({}).", status.ToString());
return status;
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/vector/vector_index_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status VectorIndexCache::SlowGetVectorIndexByKey(const VectorIndexCacheKey& inde
schema->set_entity_id(schema_id);
rpc.MutableRequest()->set_index_name(index_name);

DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));

if (CheckIndexResponse(*rpc.Response())) {
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_vector_index);
Expand All @@ -134,7 +134,7 @@ Status VectorIndexCache::SlowGetVectorIndexById(int64_t index_id, std::shared_pt
index_id_pb->set_parent_entity_id(::dingodb::pb::meta::ReservedSchemaIds::DINGO_SCHEMA);
index_id_pb->set_entity_id(index_id);

DINGO_RETURN_NOT_OK(stub_.GetMetaRpcController()->SyncCall(rpc));
DINGO_RETURN_NOT_OK(stub_.GetCoordinatorRpcController()->SyncCall(rpc));

if (CheckIndexResponse(*rpc.Response())) {
return ProcessIndexDefinitionWithId(rpc.Response()->index_definition_with_id(), out_vector_index);
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/vector/vector_index_creator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ Status VectorIndexCreator::Create(int64_t& out_index_id) {

out_index_id = new_index_id;

return data_->stub.GetMetaRpcController()->SyncCall(rpc);
return data_->stub.GetCoordinatorRpcController()->SyncCall(rpc);
}

} // namespace sdk
Expand Down
3 changes: 2 additions & 1 deletion test/unit_test/sdk/mock_client_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class MockClientStub final : public ClientStub {
~MockClientStub() override = default;

MOCK_METHOD(std::shared_ptr<CoordinatorRpcController>, GetCoordinatorRpcController, (), (const, override));
MOCK_METHOD(std::shared_ptr<CoordinatorRpcController>, GetMetaRpcController, (), (const, override));
MOCK_METHOD(std::shared_ptr<CoordinatorRpcController>, GetTsoRpcController, (), (const, override));
MOCK_METHOD(std::shared_ptr<CoordinatorRpcController>, GetAutoIncrementerRpcController, (), (const, override));
MOCK_METHOD(std::shared_ptr<MetaCache>, GetMetaCache, (), (const, override));
MOCK_METHOD(std::shared_ptr<RpcClient>, GetRpcClient, (), (const, override));
MOCK_METHOD(std::shared_ptr<RegionScannerFactory>, GetRawKvRegionScannerFactory, (), (const, override));
Expand Down
14 changes: 7 additions & 7 deletions test/unit_test/sdk/test_auto_increment_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SDKAutoInrementerTest : public TestBase {
};

TEST_F(SDKAutoInrementerTest, CacheEmpty) {
EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1);
Expand All @@ -87,7 +87,7 @@ TEST_F(SDKAutoInrementerTest, CacheEmpty) {
}

TEST_F(SDKAutoInrementerTest, FromCache) {
EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1);
Expand All @@ -114,7 +114,7 @@ TEST_F(SDKAutoInrementerTest, FromCache) {
}

TEST_F(SDKAutoInrementerTest, FromCacheThenRefill) {
EXPECT_CALL(*meta_rpc_controller, SyncCall)
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall)
.WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
Expand Down Expand Up @@ -159,7 +159,7 @@ TEST_F(SDKAutoInrementerTest, FromCacheThenRefill) {
}

TEST_F(SDKAutoInrementerTest, MultiThreadGetNextId) {
EXPECT_CALL(*meta_rpc_controller, SyncCall)
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall)
.WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
Expand Down Expand Up @@ -205,7 +205,7 @@ TEST_F(SDKAutoInrementerTest, MultiThreadGetNextId) {
}

TEST_F(SDKAutoInrementerTest, CacheGetAutoId) {
EXPECT_CALL(*meta_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall).WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
EXPECT_EQ(t_rpc->Request()->auto_increment_increment(), 1);
Expand All @@ -225,7 +225,7 @@ TEST_F(SDKAutoInrementerTest, CacheGetAutoId) {
}

TEST_F(SDKAutoInrementerTest, CacheUpdate) {
EXPECT_CALL(*meta_rpc_controller, SyncCall)
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall)
.WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
Expand Down Expand Up @@ -276,7 +276,7 @@ TEST_F(SDKAutoInrementerTest, CacheUpdate) {
}

TEST_F(SDKAutoInrementerTest, MultiThreadUpdateId) {
EXPECT_CALL(*meta_rpc_controller, SyncCall)
EXPECT_CALL(*auto_incrementer_rpc_controller, SyncCall)
.WillOnce([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<GenerateAutoIncrementRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->count(), FLAGS_auto_incre_req_count);
Expand Down
18 changes: 12 additions & 6 deletions test/unit_test/sdk/test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ class TestBase : public ::testing::Test {
EXPECT_CALL(*stub, GetCoordinatorRpcController).Times(testing::AnyNumber());
ON_CALL(*coordinator_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK()));

meta_rpc_controller = std::make_shared<MockCoordinatorRpcController>(*stub);
ON_CALL(*stub, GetMetaRpcController).WillByDefault(testing::Return(meta_rpc_controller));
EXPECT_CALL(*stub, GetMetaRpcController).Times(testing::AnyNumber());
ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK()));
tso_rpc_controller = std::make_shared<MockCoordinatorRpcController>(*stub);
ON_CALL(*stub, GetTsoRpcController).WillByDefault(testing::Return(tso_rpc_controller));
EXPECT_CALL(*stub, GetTsoRpcController).Times(testing::AnyNumber());
ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK()));

auto_incrementer_rpc_controller = std::make_shared<MockCoordinatorRpcController>(*stub);
ON_CALL(*stub, GetAutoIncrementerRpcController).WillByDefault(testing::Return(auto_incrementer_rpc_controller));
EXPECT_CALL(*stub, GetAutoIncrementerRpcController).Times(testing::AnyNumber());
ON_CALL(*auto_incrementer_rpc_controller, SyncCall).WillByDefault(testing::Return(Status::OK()));

meta_cache = std::make_shared<MetaCache>(coordinator_rpc_controller);
ON_CALL(*stub, GetMetaCache).WillByDefault(testing::Return(meta_cache));
Expand Down Expand Up @@ -122,7 +127,7 @@ class TestBase : public ::testing::Test {
if (txn_actuator) {
txn_actuator->Stop();
}

delete client;
}

Expand All @@ -144,7 +149,8 @@ class TestBase : public ::testing::Test {
}

std::shared_ptr<MockCoordinatorRpcController> coordinator_rpc_controller;
std::shared_ptr<MockCoordinatorRpcController> meta_rpc_controller;
std::shared_ptr<MockCoordinatorRpcController> tso_rpc_controller;
std::shared_ptr<MockCoordinatorRpcController> auto_incrementer_rpc_controller;
std::shared_ptr<MetaCache> meta_cache;
std::shared_ptr<MockRpcClient> rpc_client;
std::shared_ptr<MockRegionScannerFactory> region_scanner_factory;
Expand Down
6 changes: 3 additions & 3 deletions test/unit_test/sdk/transaction/test_txn_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SDKTxnImplTest : public TestBase {
options.kind = kOptimistic;
options.isolation = kSnapshotIsolation;

ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) {
ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<TsoServiceRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO);
t_rpc->MutableResponse()->set_count(FLAGS_tso_batch_size);
Expand All @@ -65,7 +65,7 @@ class SDKTxnImplTest : public TestBase {
return Status::OK();
});

EXPECT_CALL(*meta_rpc_controller, SyncCall).Times(testing::AnyNumber());
EXPECT_CALL(*tso_rpc_controller, SyncCall).Times(testing::AnyNumber());

ON_CALL(*txn_lock_resolver, ResolveLock).WillByDefault(testing::Return(Status::OK()));
}
Expand All @@ -74,7 +74,7 @@ class SDKTxnImplTest : public TestBase {
};

TEST_F(SDKTxnImplTest, BeginFail) {
ON_CALL(*meta_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) {
ON_CALL(*tso_rpc_controller, SyncCall).WillByDefault([&](Rpc& rpc) {
auto* t_rpc = dynamic_cast<TsoServiceRpc*>(&rpc);
EXPECT_EQ(t_rpc->Request()->op_type(), pb::meta::OP_GEN_TSO);
return Status::NetworkError("mock error");
Expand Down
Loading