Skip to content

Commit d7b839f

Browse files
LiuRuoyu01chuandew
authored andcommitted
[feat][sdk]Support concurrent prewrite
1 parent 5eb6670 commit d7b839f

File tree

13 files changed

+237
-85
lines changed

13 files changed

+237
-85
lines changed

include/dingosdk/client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ class Transaction {
263263
bool IsOnePc() const;
264264

265265
bool IsAsyncCommit() const;
266+
267+
bool IsConcurrentPreCommit() const;
266268

267269
private:
268270
friend class Client;

src/sdk/client.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,8 @@ bool Transaction::IsOnePc() const { return data_->impl->IsOnePc(); }
601601

602602
bool Transaction::IsAsyncCommit() const { return data_->impl->IsAsyncCommit(); }
603603

604+
bool Transaction::IsConcurrentPreCommit() const { return data_->impl->IsConcurrentPreCommit(); }
605+
604606
RegionCreator::RegionCreator(Data* data) : data_(data) {}
605607

606608
RegionCreator::~RegionCreator() { delete data_; }

src/sdk/common/param_config.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,15 @@ DEFINE_int64(txn_op_max_retry, 20, "txn op max retry times");
4949

5050
DEFINE_int64(txn_prewrite_delay_ms, 500, "txn prewrite delay ms");
5151
DEFINE_int64(txn_prewrite_max_retry, 300, "txn prewrite max retry");
52+
DEFINE_bool(enable_txn_concurrent_prewrite, true, "enable txn concurrent prewrite");
5253

5354
DEFINE_int64(raw_kv_delay_ms, 500, "raw kv backoff delay ms");
5455
DEFINE_int64(raw_kv_max_retry, 10, "raw kv max retry times");
5556

5657
DEFINE_int64(vector_op_delay_ms, 500, "vector task base backoff delay ms");
5758
DEFINE_int64(vector_op_max_retry, 30, "vector task max retry times");
5859

59-
DEFINE_int64(txn_max_batch_count, 1000, "txn max batch count");
60+
DEFINE_int64(txn_max_batch_count, 4096, "txn max batch count");
6061
DEFINE_int64(txn_max_async_commit_count, 256, "txn max async commit count");
6162
DEFINE_bool(enable_txn_async_commit, true, "enable txn async commit");
6263

@@ -65,5 +66,7 @@ DEFINE_bool(log_rpc_time, false, "log rpc time");
6566
DEFINE_int64(txn_heartbeat_interval_ms, 8000, "txn heartbeat interval time");
6667
DEFINE_int64(txn_heartbeat_lock_delay_ms, 20000, "txn heartbeat lock delay time");
6768

69+
DEFINE_int64(txn_check_status_interval_ms, 100, "txn check status interval ms");
70+
6871
DEFINE_uint32(stale_period_us, 1000, "stale period us default 1000 us, used for tso provider");
6972
DEFINE_uint32(tso_batch_size, 256, "tso batch size default 256, used for tso provider");

src/sdk/common/param_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ DECLARE_int64(txn_op_max_retry);
6666

6767
DECLARE_int64(txn_prewrite_delay_ms);
6868
DECLARE_int64(txn_prewrite_max_retry);
69+
DECLARE_bool(enable_txn_concurrent_prewrite);
6970

7071
DECLARE_int64(vector_op_delay_ms);
7172
DECLARE_int64(vector_op_max_retry);
@@ -79,6 +80,8 @@ DECLARE_bool(log_rpc_time);
7980
DECLARE_int64(txn_heartbeat_interval_ms);
8081
DECLARE_int64(txn_heartbeat_lock_delay_ms);
8182

83+
DECLARE_int64(txn_check_status_interval_ms);
84+
8285
DECLARE_uint32(stale_period_us);
8386
DECLARE_uint32(tso_batch_size);
8487

src/sdk/transaction/txn_impl.cc

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,11 @@ void TxnImpl::ScheduleHeartBeat() {
443443
void TxnImpl::DoHeartBeat(int64_t start_ts, std::string primary_key) {
444444
State state = state_.load();
445445
if (state != kPreCommitted && state != kPreCommitting && state != kCommitting) {
446+
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] heartbeat stop, state({}).", ID(), StateName(state));
446447
return;
447448
}
448-
if (use_async_commit_.load() && state == kPreCommitted) {
449+
if (use_async_commit_.load() && state != kPreCommitting) {
450+
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] heartbeat stop for async commit, state({}).", ID(), StateName(state));
449451
return;
450452
}
451453
std::shared_ptr<TxnHeartbeatTask> heartbeat_task = std::make_shared<TxnHeartbeatTask>(stub_, start_ts, primary_key);
@@ -500,6 +502,8 @@ Status TxnImpl::DoPreCommit() {
500502

501503
use_async_commit_.store(buffer_->MutationsSize() < FLAGS_txn_max_async_commit_count && FLAGS_enable_txn_async_commit);
502504

505+
use_concurrent_precommit_.store(FLAGS_enable_txn_concurrent_prewrite);
506+
503507
Status status = is_one_pc_.load() ? PreCommit1PC() : PreCommit2PC();
504508

505509
if (!status.ok()) {
@@ -554,6 +558,20 @@ Status TxnImpl::PreCommit1PC() {
554558
Status TxnImpl::PreCommit2PC() {
555559
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] precommit primary key, pk({}).", ID(),
556560
StringToHex(buffer_->GetPrimaryKey()));
561+
562+
bool is_one_pc = is_one_pc_.load();
563+
CHECK(!is_one_pc) << fmt::format("[sdk.txn.{}] precommit 2pc but is_one_pc is true.", ID());
564+
565+
if (use_concurrent_precommit_.load()) {
566+
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] precommit use concurrent prewrite.", ID());
567+
return PreCommit2PCConcurrent();
568+
} else {
569+
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] precommit use sequential prewrite.", ID());
570+
return PreCommit2PCSequential();
571+
}
572+
}
573+
574+
Status TxnImpl::PreCommit2PCSequential() {
557575
// primary key map
558576
std::map<std::string, const TxnMutation*> mutations_map_primary_key;
559577
mutations_map_primary_key.emplace(
@@ -566,8 +584,10 @@ Status TxnImpl::PreCommit2PC() {
566584
}
567585
mutations_map_ordinary_keys.emplace(std::make_pair(key, &mutation));
568586
}
569-
bool is_one_pc = is_one_pc_.load();
570-
CHECK(!is_one_pc) << fmt::format("[sdk.txn.{}] precommit 2pc but is_one_pc is true.", ID());
587+
588+
// precommit primary key
589+
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] precommit primary key.", ID());
590+
bool is_one_pc = false;
571591
bool use_async_commit = use_async_commit_.load();
572592
uint64_t min_commit_ts = 0;
573593
TxnPrewriteTask task_primary(stub_, buffer_->GetPrimaryKey(), mutations_map_primary_key, shared_from_this(),
@@ -584,14 +604,9 @@ Status TxnImpl::PreCommit2PC() {
584604
if (!use_async_commit) {
585605
use_async_commit_.store(false);
586606
} else {
587-
CHECK(min_commit_ts > 0) << fmt::format("[sdk.txn.{}] min_commit_ts({}) invalid.", ID(), min_commit_ts);
588-
min_commit_ts = std::max(min_commit_ts, commit_ts_.load());
589-
commit_ts_.store(min_commit_ts);
607+
UpdateAsyncCommitTs(min_commit_ts);
590608
}
591609

592-
// 2pc need schedule heartbeat to update lock ttl
593-
ScheduleHeartBeat();
594-
595610
// precommit ordinary keys
596611
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] precommit ordinary keys.", ID());
597612
min_commit_ts = 0;
@@ -607,14 +622,57 @@ Status TxnImpl::PreCommit2PC() {
607622
if (!use_async_commit) {
608623
use_async_commit_.store(false);
609624
} else {
610-
CHECK(min_commit_ts > 0) << fmt::format("[sdk.txn.{}] min_commit_ts({}) invalid.", ID(), min_commit_ts);
611-
min_commit_ts = std::max(min_commit_ts, commit_ts_.load());
612-
commit_ts_.store(min_commit_ts);
625+
UpdateAsyncCommitTs(min_commit_ts);
626+
}
627+
628+
return Status::OK();
629+
}
630+
631+
Status TxnImpl::PreCommit2PCConcurrent() {
632+
// all keys map
633+
std::map<std::string, const TxnMutation*> mutations_map_all_keys;
634+
// ordinary keys map for async commit
635+
std::map<std::string, const TxnMutation*> mutations_map_ordinary_keys;
636+
bool use_async_commit = use_async_commit_.load();
637+
for (const auto& [key, mutation] : buffer_->Mutations()) {
638+
if (key == buffer_->GetPrimaryKey()) {
639+
mutations_map_all_keys.emplace(std::make_pair(key, &mutation));
640+
continue;
641+
}
642+
mutations_map_all_keys.emplace(std::make_pair(key, &mutation));
643+
if (use_async_commit) {
644+
// for async commit, need to save ordinary keys info
645+
mutations_map_ordinary_keys.emplace(std::make_pair(key, &mutation));
646+
}
647+
}
648+
649+
bool is_one_pc = false;
650+
uint64_t min_commit_ts = 0;
651+
TxnPrewriteTask task_primary(stub_, buffer_->GetPrimaryKey(), mutations_map_all_keys, shared_from_this(),
652+
mutations_map_ordinary_keys, is_one_pc, use_async_commit, min_commit_ts);
653+
654+
Status status = task_primary.Run();
655+
if (!status.ok()) {
656+
DINGO_LOG(WARNING) << fmt::format("[sdk.txn.{}] 2pc concurrent precommit keys fail, status({}).", ID(),
657+
status.ToString());
658+
return status;
659+
}
660+
661+
if (!use_async_commit) {
662+
use_async_commit_.store(false);
663+
} else {
664+
UpdateAsyncCommitTs(min_commit_ts);
613665
}
614666

615667
return Status::OK();
616668
}
617669

670+
void TxnImpl::UpdateAsyncCommitTs(uint64_t min_commit_ts) {
671+
CHECK(min_commit_ts > 0) << fmt::format("[sdk.txn.{}] min_commit_ts({}) invalid.", ID(), min_commit_ts);
672+
min_commit_ts = std::max(min_commit_ts, commit_ts_.load());
673+
commit_ts_.store(min_commit_ts);
674+
}
675+
618676
Status TxnImpl::CommitPrimaryKey() {
619677
std::vector<std::string> keys = {buffer_->GetPrimaryKey()};
620678
Status status;

src/sdk/transaction/txn_impl.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,14 @@ class TxnImpl : public std::enable_shared_from_this<TxnImpl> {
125125

126126
Status Rollback();
127127

128+
void ScheduleHeartBeat();
129+
128130
bool IsOnePc() const { return is_one_pc_.load(); }
129131

130132
bool IsAsyncCommit() const { return !is_one_pc_.load() && use_async_commit_.load(); }
131133

134+
bool IsConcurrentPreCommit() const { return use_concurrent_precommit_.load(); }
135+
132136
int64_t GetStartTs() const { return start_ts_.load(); }
133137
int64_t GetCommitTs() const { return commit_ts_.load(); }
134138
std::string GetPrimaryKey() const { return buffer_->GetPrimaryKey(); }
@@ -204,6 +208,10 @@ class TxnImpl : public std::enable_shared_from_this<TxnImpl> {
204208
Status DoPreCommit();
205209
Status PreCommit1PC();
206210
Status PreCommit2PC();
211+
Status PreCommit2PCConcurrent();
212+
Status PreCommit2PCSequential();
213+
214+
void UpdateAsyncCommitTs(uint64_t min_commit_ts);
207215

208216
// txn commit
209217
Status CommitPrimaryKey();
@@ -219,7 +227,6 @@ class TxnImpl : public std::enable_shared_from_this<TxnImpl> {
219227
Status DoRollback();
220228

221229
void DoHeartBeat(int64_t start_ts, std::string primary_key);
222-
void ScheduleHeartBeat();
223230

224231
void CheckStateActive() const;
225232

@@ -235,6 +242,7 @@ class TxnImpl : public std::enable_shared_from_this<TxnImpl> {
235242

236243
std::atomic<bool> is_one_pc_{false};
237244
std::atomic<bool> use_async_commit_{false};
245+
std::atomic<bool> use_concurrent_precommit_{false};
238246

239247
TxnBufferUPtr buffer_;
240248

src/sdk/transaction/txn_lock_resolver.cc

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <fmt/format.h>
1818
#include <fmt/ranges.h>
19+
#include <unistd.h>
1920

2021
#include <algorithm>
2122
#include <cstdint>
@@ -27,9 +28,11 @@
2728
#include "dingosdk/status.h"
2829
#include "glog/logging.h"
2930
#include "sdk/client_stub.h"
31+
#include "sdk/common/param_config.h"
3032
#include "sdk/transaction/txn_task/txn_check_secondary_locks_task.h"
3133
#include "sdk/transaction/txn_task/txn_check_status_task.h"
3234
#include "sdk/transaction/txn_task/txn_resolve_lock_task.h"
35+
#include "sdk/utils/async_util.h"
3336

3437
namespace dingodb {
3538
namespace sdk {
@@ -43,18 +46,48 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf
4346

4447
// check primary key lock status
4548
TxnStatus txn_status;
46-
TxnCheckStatusTask task_check_status(stub_, conflict_lock_info.lock_ts(), conflict_lock_info.primary_lock(), start_ts,
47-
txn_status, force_sync_commit);
48-
Status status = task_check_status.Run();
49-
if (!status.ok()) {
50-
if (status.IsNotFound()) {
51-
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] not exist txn when check status, status({}) lock({}).", start_ts,
52-
status.ToString(), conflict_lock_info.ShortDebugString());
53-
54-
return Status::OK();
49+
Status status;
50+
int64_t retry_times = 0;
51+
int64_t max_retry_times = (FLAGS_txn_heartbeat_lock_delay_ms / FLAGS_txn_check_status_interval_ms) + 1;
52+
bool rollback_if_not_exist = false;
53+
while (retry_times < max_retry_times) {
54+
status = Status();
55+
txn_status = TxnStatus();
56+
TxnCheckStatusTask task_check_status(stub_, conflict_lock_info.lock_ts(), conflict_lock_info.primary_lock(),
57+
start_ts, txn_status, force_sync_commit, rollback_if_not_exist);
58+
status = task_check_status.Run();
59+
if (!status.ok()) {
60+
if (status.IsTxnNotFound()) {
61+
// check lock ttl expired
62+
int64_t current_ts;
63+
Status status1 = stub_.GetTsoProvider()->GenPhysicalTs(2, current_ts);
64+
if (!status1.ok()) {
65+
return status1;
66+
}
67+
if (conflict_lock_info.lock_ttl() > current_ts) {
68+
// lock ttl not expired, wait and retry
69+
SleepUs(FLAGS_txn_check_status_interval_ms * 1000);
70+
} else {
71+
// lock ttl expired, next check will rollback txn if not exist
72+
rollback_if_not_exist = true;
73+
}
74+
} else {
75+
return status;
76+
}
5577
} else {
56-
return status;
78+
break;
5779
}
80+
retry_times++;
81+
}
82+
83+
if (!status.ok()) {
84+
DINGO_LOG(WARNING) << fmt::format(
85+
"[sdk.txn.{}] check primary lock status fail, retry_times({}), max_retry_times({}), interval_ms({}) "
86+
"rollback_if_not_exist({}), "
87+
"lock_info({}), status({}).",
88+
start_ts, retry_times, max_retry_times, FLAGS_txn_check_status_interval_ms, rollback_if_not_exist,
89+
conflict_lock_info.ShortDebugString(), status.ToString());
90+
return status;
5891
}
5992

6093
if (txn_status.primary_lock_info.lock_ts() > 0 && txn_status.primary_lock_info.use_async_commit() &&

src/sdk/transaction/txn_lock_resolver.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ class TxnLockResolver {
101101
virtual Status ResolveLock(const pb::store::LockInfo& conflict_lock_info, int64_t start_ts,
102102
bool force_sync_commit = false);
103103

104-
virtual Status ResolveLockSecondaryLocks(const pb::store::LockInfo& primary_lock_info, int64_t start_ts,
105-
const TxnStatus& txn_status, const pb::store::LockInfo& conflict_lock_info);
104+
private:
105+
Status ResolveLockSecondaryLocks(const pb::store::LockInfo& primary_lock_info, int64_t start_ts,
106+
const TxnStatus& txn_status, const pb::store::LockInfo& conflict_lock_info);
106107

107-
virtual Status ResolveNormalLock(const pb::store::LockInfo& lock_info, int64_t start_ts, const TxnStatus& txn_status);
108+
Status ResolveNormalLock(const pb::store::LockInfo& lock_info, int64_t start_ts, const TxnStatus& txn_status);
108109

109-
private:
110110
const ClientStub& stub_;
111111
};
112112

src/sdk/transaction/txn_task/txn_check_status_task.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ void TxnCheckStatusTask::DoAsync() {
5656
rpc_.MutableRequest()->set_caller_start_ts(start_ts_);
5757
rpc_.MutableRequest()->set_current_ts(current_ts);
5858
rpc_.MutableRequest()->set_force_sync_commit(force_sync_commit_);
59+
rpc_.MutableRequest()->set_rollback_if_not_exist(rollback_if_not_exist_);
5960

6061
store_rpc_controller_.ResetRegion(region);
6162
store_rpc_controller_.AsyncCall([this](auto&& s) { TxnCheckStatusRpcCallback(std::forward<decltype(s)>(s)); });

src/sdk/transaction/txn_task/txn_check_status_task.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ namespace sdk {
3131
class TxnCheckStatusTask : public TxnTask {
3232
public:
3333
TxnCheckStatusTask(const ClientStub& stub, int64_t lock_ts, const std::string& primary_key, int64_t start_ts,
34-
TxnStatus& txn_status, bool force_sync_commit = false)
34+
TxnStatus& txn_status, bool force_sync_commit = false, bool rollback_if_not_exist = false)
3535
: TxnTask(stub),
3636
lock_ts_(lock_ts),
3737
primary_key_(primary_key),
3838
start_ts_(start_ts),
3939
txn_status_(txn_status),
4040
force_sync_commit_(force_sync_commit),
41+
rollback_if_not_exist_(rollback_if_not_exist),
4142
store_rpc_controller_(stub, rpc_) {}
4243

4344
~TxnCheckStatusTask() override = default;
@@ -57,6 +58,7 @@ class TxnCheckStatusTask : public TxnTask {
5758
TxnCheckTxnStatusRpc rpc_;
5859
bool force_sync_commit_{false};
5960
uint64_t resolved_lock_{0};
61+
bool rollback_if_not_exist_{false};
6062

6163
RWLock rw_lock_;
6264
Status status_;

0 commit comments

Comments
 (0)