From 2d8a8c0f42cbba510ac887debb27d48d5e2db0d9 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Sun, 16 Mar 2025 15:15:41 +0800 Subject: [PATCH 1/3] chore: minor code format --- src/meta/types/src/raft_types.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/meta/types/src/raft_types.rs b/src/meta/types/src/raft_types.rs index 40037180ddb63..6a42d4db4c711 100644 --- a/src/meta/types/src/raft_types.rs +++ b/src/meta/types/src/raft_types.rs @@ -14,6 +14,7 @@ //! This mod wraps openraft types that have generics parameter with concrete types. +use openraft::error::Infallible; use openraft::impls::OneshotResponder; use openraft::RaftTypeConfig; use openraft::TokioRuntime; @@ -54,17 +55,15 @@ pub type Entry = openraft::Entry; pub type SnapshotMeta = openraft::SnapshotMeta; pub type Snapshot = openraft::Snapshot; -#[allow(dead_code)] -pub type SnapshotSegmentId = openraft::SnapshotSegmentId; pub type RaftMetrics = openraft::RaftMetrics; pub type ErrorSubject = openraft::ErrorSubject; pub type ErrorVerb = openraft::ErrorVerb; -pub type RPCError = openraft::error::RPCError; +pub type RPCError = openraft::error::RPCError; pub type RemoteError = openraft::error::RemoteError; -pub type RaftError = openraft::error::RaftError; +pub type RaftError = openraft::error::RaftError; pub type NetworkError = openraft::error::NetworkError; pub type StorageError = openraft::StorageError; @@ -73,8 +72,7 @@ pub type Fatal = openraft::error::Fatal; pub type ChangeMembershipError = openraft::error::ChangeMembershipError; pub type ClientWriteError = openraft::error::ClientWriteError; pub type InitializeError = openraft::error::InitializeError; -pub type StreamingError = - openraft::error::StreamingError; +pub type StreamingError = openraft::error::StreamingError; pub type AppendEntriesRequest = openraft::raft::AppendEntriesRequest; pub type AppendEntriesResponse = openraft::raft::AppendEntriesResponse; From 047bca895598bc255ff7f98b48d9bd9e7d65a754 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Sun, 16 Mar 2025 23:16:28 +0800 Subject: [PATCH 2/3] refactor(meta): upgrade OpenRaft to v0.10.0-alpha.9 Major changes since v0.10.0-alpha.7 to v0.10.0-alpha.9 : - Added: - [9178ef8f](https://github.com/databendlabs/openraft/commit/9178ef8f1d49de0c85fb835d559d53c5876b2c64) Retrieve Key Log IDs via `RaftLogReader::get_key_log_ids()`. - [636664de](https://github.com/databendlabs/openraft/commit/636664dec5fda5b396b7a366801e7a02f1e23225) Abstract Term. - [d1bd8a24](https://github.com/databendlabs/openraft/commit/d1bd8a24bb9c7578a247c3738a2966d99b8cb783) Abstract `LeaderId` and `CommittedLeaderId`. - [a7899729](https://github.com/databendlabs/openraft/commit/a7899729d9658171b48dffd4f4ba96382d28cd62) Abstract `Vote`. - [c6a01749](https://github.com/databendlabs/openraft/commit/c6a0174986ea8c03eba41cff657547abe16b4c7b) Add changelog support in the `#[since()]` macro. - [3b76a7e3](https://github.com/databendlabs/openraft/commit/3b76a7e34a286cfff73e4201dc40b7e2bbfe99dd) New `RaftEntry` methods: `log_id()` and `index()`. - Changed: - [c8813d84](https://github.com/databendlabs/openraft/commit/c8813d84d54d503e7dcb7af0fc9926e6b4720ec9) Replace `loosen-follower-log-revert` feature flag with `Config::allow_log_reversion`. - [03437e14](https://github.com/databendlabs/openraft/commit/03437e14214407ad4da0adde5ebcf786a6701b46) Membership::new_with_defaults(). - [57146747](https://github.com/databendlabs/openraft/commit/5714674701234608f31889399993740e429533f8) change `Vote` to `Vote`. - [4d362901](https://github.com/databendlabs/openraft/commit/4d362901be7c4b28374b7c5ea0faa1164363d6cd) change `LogId` to `LogId`. - [429a9fde](https://github.com/databendlabs/openraft/commit/429a9fdebf247a64d5aa4ac316e2b2afb39ede1b) change `LeaderId` to `LeaderId`. - [d1b41efb](https://github.com/databendlabs/openraft/commit/d1b41efb9050c50eefdd4154dd95c2afd95d1b9c) Remove feature flag `single-term-leader`. - [3f5cbca4](https://github.com/databendlabs/openraft/commit/3f5cbca49debb8057095b00c2fe8fccd50680ceb) `RaftPayload::get_membership()` now returns an owned `Membership`. - [930b4a3d](https://github.com/databendlabs/openraft/commit/930b4a3da0d289c6ce14fcbd21221be94785edc6) remove unused RaftPayload::is_blank(). - [7bebecb9](https://github.com/databendlabs/openraft/commit/7bebecb9ab6eab9a292e0409d4e0826ece4fb44f) Refine Log Entry Traits. - [40e5b1ae](https://github.com/databendlabs/openraft/commit/40e5b1ae238c214fc0c16282474bdce54ac9b0c6) Simplify `send_snapshot()` error type in `Chunked`. - [926bf6de](https://github.com/databendlabs/openraft/commit/926bf6de1131abcc63e7f08ec190e6e0873faefd) Remove `RemoteError` variant from `StreamingError`. - [73fbae1e](https://github.com/databendlabs/openraft/commit/73fbae1ec95bf8c0ef111c17f3512f40602c3702) Remove Box from `Snapshot::snapshot`. - [ac2b7203](https://github.com/databendlabs/openraft/commit/ac2b7203b0be0343cba4e624c52e3e3bc2eecce9) fix lint: too large: StorageError and SnapshotSignature. - [96173030](https://github.com/databendlabs/openraft/commit/96173030acd39811d381413cfdaf8c1e13ac453b) Track commit and apply progress with `io_state.apply_progress`. --- Cargo.lock | 26 +++---------------- Cargo.toml | 3 +-- src/meta/control/src/import.rs | 6 ++--- .../sm_v003/compact_immutable_levels_test.rs | 8 +++--- .../src/sm_v003/compact_with_db_test.rs | 16 ++++++------ src/meta/raft-store/src/sm_v003/sm_v003.rs | 4 +-- .../raft-store/src/state_machine/testing.rs | 8 ++++-- .../service/src/meta_service/meta_node.rs | 10 +++---- .../src/meta_service/raft_service_impl.rs | 4 +-- .../src/store/raft_log_storage_impl.rs | 8 +++--- .../src/store/raft_state_machine_impl.rs | 8 +++--- src/meta/service/src/store/store_inner.rs | 2 +- src/meta/service/tests/it/store.rs | 4 +-- .../types/src/proto_ext/raft_types_ext.rs | 6 +---- src/meta/types/src/raft_types.rs | 13 ++++++---- 15 files changed, 54 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6579e5310e7d..64f93398d5968 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3567,30 +3567,10 @@ dependencies = [ name = "databend-common-meta-app-types" version = "0.1.0" dependencies = [ - "anyerror", "anyhow", - "databend-common-base", - "databend-common-building", - "databend-common-exception", - "databend-common-meta-stoerr", - "databend-common-tracing", - "deepsize", - "derive_more", - "futures-util", - "log", - "map-api", "num-derive", - "num-traits", - "openraft", "prost", - "prost-build", - "rotbl", "serde", - "serde_json", - "thiserror 1.0.65", - "tokio", - "tonic", - "tonic-build", ] [[package]] @@ -10597,7 +10577,7 @@ dependencies = [ [[package]] name = "openraft" version = "0.10.0" -source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.7#e99cfebbc7a485fe55efe476c083d9decc888ef7" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.9#a529931e633e641cac09d078ed815a0d7c15a3c3" dependencies = [ "anyerror", "byte-unit", @@ -10607,7 +10587,7 @@ dependencies = [ "futures", "maplit", "openraft-macros", - "rand 0.8.5", + "rand 0.9.0", "serde", "thiserror 1.0.65", "tokio", @@ -10619,7 +10599,7 @@ dependencies = [ [[package]] name = "openraft-macros" version = "0.10.0" -source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.7#e99cfebbc7a485fe55efe476c083d9decc888ef7" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.9#a529931e633e641cac09d078ed815a0d7c15a3c3" dependencies = [ "chrono", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index b7ee9da0a95f4..1fb9408b11f97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -397,7 +397,6 @@ opendal = { version = "0.51.2", features = [ openraft = { version = "0.10.0", features = [ "serde", "tracing-log", - "loosen-follower-log-revert", # allows removing all data from a follower and restoring from the leader. ] } opensrv-mysql = { git = "https://github.com/databendlabs/opensrv.git", rev = "a1fb4da", features = ["tls"] } orc-rust = "0.5.0" @@ -636,7 +635,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c149502" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.3" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } -openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" } +openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" } orc-rust = { git = "https://github.com/youngsofun/orc-rust", rev = "94ab8e9" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } diff --git a/src/meta/control/src/import.rs b/src/meta/control/src/import.rs index 69583b1d2b979..9c758f6ef15bf 100644 --- a/src/meta/control/src/import.rs +++ b/src/meta/control/src/import.rs @@ -38,7 +38,7 @@ use databend_common_meta_sled_store::init_get_sled_db; use databend_common_meta_sled_store::openraft::storage::RaftLogStorageExt; use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; use databend_common_meta_types::node::Node; -use databend_common_meta_types::raft_types::CommittedLeaderId; +use databend_common_meta_types::raft_types::new_log_id; use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::EntryPayload; use databend_common_meta_types::raft_types::LogId; @@ -314,10 +314,10 @@ async fn init_new_cluster( }; let last_log_id = std::cmp::max(last_applied, max_log_id); - let mut log_id = last_log_id.unwrap_or(LogId::new(CommittedLeaderId::new(0, 0), 0)); + let mut log_id = last_log_id.unwrap_or(new_log_id(0, 0, 0)); let node_ids = nodes.keys().copied().collect::>(); - let membership = Membership::new(vec![node_ids], ()); + let membership = Membership::new(vec![node_ids], ())?; // Update snapshot: Replace nodes set and membership config. { diff --git a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs index 38136c5554ee2..3e5ffb6d34669 100644 --- a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs @@ -51,7 +51,7 @@ async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { assert_eq!(compacted.iter_immutable_levels().count(), 1); assert_eq!( d.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())) + &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) ); assert_eq!(d.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -204,7 +204,7 @@ async fn build_3_levels() -> anyhow::Result { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(1, 1, 1)); *sd.nodes_mut() = btreemap! {1=>Node::new("1", Endpoint::new("1", 1))}; @@ -218,7 +218,7 @@ async fn build_3_levels() -> anyhow::Result { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(2, 2, 2)); *sd.nodes_mut() = btreemap! {2=>Node::new("2", Endpoint::new("2", 2))}; @@ -231,7 +231,7 @@ async fn build_3_levels() -> anyhow::Result { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(3, 3, 3)); *sd.nodes_mut() = btreemap! {3=>Node::new("3", Endpoint::new("3", 3))}; diff --git a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs index 095b699a36306..2f45db1daf830 100644 --- a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs @@ -44,7 +44,7 @@ async fn test_leveled_query_with_db() -> anyhow::Result<()> { assert_eq!(lm.curr_seq(), 7); assert_eq!( lm.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())) + &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) ); assert_eq!(lm.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -93,7 +93,7 @@ async fn test_leveled_query_with_expire_index() -> anyhow::Result<()> { assert_eq!(lm.curr_seq(), 4); assert_eq!( lm.last_membership_ref(), - &StoredMembership::new(None, Membership::new(vec![], ())) + &StoredMembership::new(None, Membership::new(vec![], ())?) ); assert_eq!(lm.last_applied_ref(), &None); assert_eq!(lm.nodes_ref(), &btreemap! {}); @@ -161,7 +161,7 @@ async fn test_compact() -> anyhow::Result<()> { assert_eq!(db.curr_seq(), 7); assert_eq!( db.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())) + &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) ); assert_eq!(db.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -210,7 +210,7 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { assert_eq!(db.curr_seq(), 4); assert_eq!( db.last_membership_ref(), - &StoredMembership::new(None, Membership::new(vec![], ())) + &StoredMembership::new(None, Membership::new(vec![], ())?) ); assert_eq!(db.last_applied_ref(), &None); assert_eq!(db.nodes_ref(), &btreemap! {}); @@ -274,7 +274,7 @@ async fn test_compact_output_3_level() -> anyhow::Result<()> { assert_eq!(sys_data.curr_seq(), 7); assert_eq!( sys_data.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())) + &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ()).unwrap()) ); assert_eq!(sys_data.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -306,7 +306,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(1, 1, 1)); *sd.nodes_mut() = btreemap! {1=>Node::new("1", Endpoint::new("1", 1))}; @@ -320,7 +320,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(2, 2, 2)); *sd.nodes_mut() = btreemap! {2=>Node::new("2", Endpoint::new("2", 2))}; @@ -333,7 +333,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { let sd = lm.writable_mut().sys_data_mut(); *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())); + StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?); *sd.last_applied_mut() = Some(log_id(3, 3, 3)); *sd.nodes_mut() = btreemap! {3=>Node::new("3", Endpoint::new("3", 3))}; diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index 5645d77db943f..8ee064204dfde 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -21,7 +21,7 @@ use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::sys_data::SysData; use databend_common_meta_types::AppliedState; use log::info; -use openraft::RaftLogId; +use openraft::entry::RaftEntry; use crate::applier::Applier; use crate::leveled_store::leveled_map::compactor::Compactor; @@ -137,7 +137,7 @@ impl SMV003 { let mut res = vec![]; for ent in entries.into_iter() { - let log_id = *ent.get_log_id(); + let log_id = ent.log_id(); let r = applier .apply(&ent) .await diff --git a/src/meta/raft-store/src/state_machine/testing.rs b/src/meta/raft-store/src/state_machine/testing.rs index cc4a8dd31fa77..5ca229802ced8 100644 --- a/src/meta/raft-store/src/state_machine/testing.rs +++ b/src/meta/raft-store/src/state_machine/testing.rs @@ -29,7 +29,9 @@ pub fn snapshot_logs() -> (Vec, Vec) { let logs = vec![ Entry { log_id: new_log_id(1, 0, 1), - payload: EntryPayload::Membership(Membership::new(vec![btreeset![1, 2, 3]], ())), + payload: EntryPayload::Membership( + Membership::new(vec![btreeset![1, 2, 3]], ()).unwrap(), + ), }, Entry::new_blank(new_log_id(1, 0, 2)), Entry::new_blank(new_log_id(1, 0, 3)), @@ -43,7 +45,9 @@ pub fn snapshot_logs() -> (Vec, Vec) { }, Entry { log_id: new_log_id(1, 0, 5), - payload: EntryPayload::Membership(Membership::new(vec![btreeset![4, 5, 6]], ())), + payload: EntryPayload::Membership( + Membership::new(vec![btreeset![4, 5, 6]], ()).unwrap(), + ), }, Entry { log_id: new_log_id(1, 0, 6), diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index 6bb7f3ae56d15..61eb900a79aa3 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -43,10 +43,9 @@ use databend_common_meta_types::protobuf::raft_service_server::RaftServiceServer use databend_common_meta_types::protobuf::watch_request::FilterType; use databend_common_meta_types::protobuf::WatchRequest; use databend_common_meta_types::protobuf::WatchResponse; -use databend_common_meta_types::raft_types::CommittedLeaderId; +use databend_common_meta_types::raft_types::new_log_id; use databend_common_meta_types::raft_types::ForwardToLeader; use databend_common_meta_types::raft_types::InitializeError; -use databend_common_meta_types::raft_types::LogId; use databend_common_meta_types::raft_types::MembershipNode; use databend_common_meta_types::raft_types::NodeId; use databend_common_meta_types::raft_types::RaftMetrics; @@ -243,6 +242,9 @@ impl MetaNode { snapshot_policy: SnapshotPolicy::LogsSinceLast(config.snapshot_logs_since_last), max_in_snapshot_log_to_keep: config.max_applied_log_to_keep, snapshot_max_chunk_size: config.snapshot_chunk_size, + // Allow Leader to reset replication if a follower clears its log. + // Usefull in a testing environment. + allow_log_reversion: Some(true), ..Default::default() } .validate() @@ -907,9 +909,7 @@ impl MetaNode { is_leader: metrics.state == openraft::ServerState::Leader, current_term: metrics.current_term, last_log_index: metrics.last_log_index.unwrap_or(0), - last_applied: metrics - .last_applied - .unwrap_or(LogId::new(CommittedLeaderId::new(0, 0), 0)), + last_applied: metrics.last_applied.unwrap_or(new_log_id(0, 0, 0)), snapshot_last_log_id: metrics.snapshot, purged: metrics.purged, leader, diff --git a/src/meta/service/src/meta_service/raft_service_impl.rs b/src/meta/service/src/meta_service/raft_service_impl.rs index 3e130499e1efb..6583a39e74192 100644 --- a/src/meta/service/src/meta_service/raft_service_impl.rs +++ b/src/meta/service/src/meta_service/raft_service_impl.rs @@ -169,7 +169,7 @@ impl RaftServiceImpl { let snapshot = Snapshot { meta: snapshot_meta.clone(), - snapshot: Box::new(db), + snapshot: db, }; let resp = raft.install_full_snapshot(req_vote, snapshot).await?; @@ -206,7 +206,7 @@ impl RaftServiceImpl { let snapshot = Snapshot { meta: snapshot_meta, - snapshot: Box::new(db), + snapshot: db, }; let res = self diff --git a/src/meta/service/src/store/raft_log_storage_impl.rs b/src/meta/service/src/store/raft_log_storage_impl.rs index bba4e2ebcf603..a4b05f0a48d36 100644 --- a/src/meta/service/src/store/raft_log_storage_impl.rs +++ b/src/meta/service/src/store/raft_log_storage_impl.rs @@ -21,12 +21,12 @@ use databend_common_base::display::display_option::DisplayOptionExt; use databend_common_meta_raft_store::raft_log_v004; use databend_common_meta_raft_store::raft_log_v004::codec_wrapper::Cw; use databend_common_meta_raft_store::raft_log_v004::io_desc::IODesc; +use databend_common_meta_sled_store::openraft::entry::RaftEntry; use databend_common_meta_sled_store::openraft::storage::RaftLogStorage; use databend_common_meta_sled_store::openraft::EntryPayload; use databend_common_meta_sled_store::openraft::LogIdOptionExt; use databend_common_meta_sled_store::openraft::LogState; use databend_common_meta_sled_store::openraft::OptionalSend; -use databend_common_meta_sled_store::openraft::RaftLogId; use databend_common_meta_sled_store::openraft::RaftLogReader; use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::IOFlushed; @@ -70,7 +70,7 @@ impl RaftLogReader for RaftStore { debug!( "RaftStore::limited_get_log_entries: got log: log_id: {}, size: {}", - ent.get_log_id(), + ent.log_id(), size ); @@ -82,8 +82,8 @@ impl RaftLogReader for RaftStore { "RaftStore::limited_get_log_entries: too many logs, early return: entries cnt: {}, total size: {}, res: [{}, {}]", res.len(), total_size, - res.first().map(|x| x.get_log_id()).unwrap(), - res.last().map(|x| x.get_log_id()).unwrap(), + res.first().map(|x| x.log_id()).unwrap(), + res.last().map(|x| x.log_id()).unwrap(), ); return Ok(res); diff --git a/src/meta/service/src/store/raft_state_machine_impl.rs b/src/meta/service/src/store/raft_state_machine_impl.rs index 58476898b7b11..02616045ad131 100644 --- a/src/meta/service/src/store/raft_state_machine_impl.rs +++ b/src/meta/service/src/store/raft_state_machine_impl.rs @@ -74,19 +74,19 @@ impl RaftStateMachine for RaftStore { // This method is not used #[fastrace::trace] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result { let ss_store = SnapshotStoreV004::new(self.inner.config.clone()); let db = ss_store .new_temp() .map_err(|e| StorageError::write_snapshot(None, &e))?; - Ok(Box::new(db)) + Ok(db) } #[fastrace::trace] async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box, + snapshot: DB, ) -> Result<(), StorageError> { let data_size = snapshot.file_size(); @@ -131,7 +131,7 @@ impl RaftStateMachine for RaftStore { let snapshot = db.map(|x| Snapshot { meta: x.snapshot_meta().clone(), - snapshot: Box::new(x), + snapshot: x, }); info!( diff --git a/src/meta/service/src/store/store_inner.rs b/src/meta/service/src/store/store_inner.rs index a6936dbfa229c..f5dbb3dea2d0f 100644 --- a/src/meta/service/src/store/store_inner.rs +++ b/src/meta/service/src/store/store_inner.rs @@ -270,7 +270,7 @@ impl RaftStoreInner { Ok(Snapshot { meta: snapshot_meta, - snapshot: Box::new(db), + snapshot: db, }) } diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index faebd665aca1a..c18a8ccfb3e81 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -240,7 +240,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { info!("--- install snapshot"); { - sto.do_install_snapshot(data.as_ref().clone()).await?; + sto.do_install_snapshot(data.clone()).await?; } info!("--- check installed meta"); @@ -256,7 +256,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { assert_eq!( StoredMembership::new( Some(log_id(1, 0, 5)), - Membership::new(vec![btreeset! {4,5,6}], ()) + Membership::new(vec![btreeset! {4,5,6}], ())? ), mem ); diff --git a/src/meta/types/src/proto_ext/raft_types_ext.rs b/src/meta/types/src/proto_ext/raft_types_ext.rs index 6c9bc72a398e9..3a3c83da4137e 100644 --- a/src/meta/types/src/proto_ext/raft_types_ext.rs +++ b/src/meta/types/src/proto_ext/raft_types_ext.rs @@ -42,7 +42,6 @@ mod log_id_impls { use crate::protobuf as pb; use crate::raft_types; - use crate::raft_types::CommittedLeaderId; impl From for pb::LogId { fn from(log_id: raft_types::LogId) -> Self { @@ -56,10 +55,7 @@ mod log_id_impls { impl From for raft_types::LogId { fn from(log_id: pb::LogId) -> Self { - raft_types::LogId::new( - CommittedLeaderId::new(log_id.term, log_id.node_id), - log_id.index, - ) + raft_types::new_log_id(log_id.term, log_id.node_id, log_id.index) } } } diff --git a/src/meta/types/src/raft_types.rs b/src/meta/types/src/raft_types.rs index 6a42d4db4c711..1888729994b43 100644 --- a/src/meta/types/src/raft_types.rs +++ b/src/meta/types/src/raft_types.rs @@ -16,13 +16,14 @@ use openraft::error::Infallible; use openraft::impls::OneshotResponder; +pub use openraft::vote::leader_id_adv::CommittedLeaderId; +use openraft::vote::RaftLeaderId; use openraft::RaftTypeConfig; use openraft::TokioRuntime; use crate::snapshot_db::DB; use crate::AppliedState; use crate::LogEntry; - pub type NodeId = u64; pub type MembershipNode = openraft::EmptyNode; pub type LogIndex = u64; @@ -35,6 +36,9 @@ impl RaftTypeConfig for TypeConfig { type R = AppliedState; type NodeId = NodeId; type Node = MembershipNode; + type Term = u64; + type LeaderId = openraft::impls::leader_id_adv::LeaderId; + type Vote = openraft::impls::Vote; type Entry = openraft::entry::Entry; type SnapshotData = DB; type AsyncRuntime = TokioRuntime; @@ -43,9 +47,8 @@ impl RaftTypeConfig for TypeConfig { pub type IOFlushed = openraft::storage::IOFlushed; -pub type CommittedLeaderId = openraft::CommittedLeaderId; -pub type LogId = openraft::LogId; -pub type Vote = openraft::Vote; +pub type LogId = openraft::LogId; +pub type Vote = openraft::Vote; pub type Membership = openraft::Membership; pub type StoredMembership = openraft::StoredMembership; @@ -72,7 +75,7 @@ pub type Fatal = openraft::error::Fatal; pub type ChangeMembershipError = openraft::error::ChangeMembershipError; pub type ClientWriteError = openraft::error::ClientWriteError; pub type InitializeError = openraft::error::InitializeError; -pub type StreamingError = openraft::error::StreamingError; +pub type StreamingError = openraft::error::StreamingError; pub type AppendEntriesRequest = openraft::raft::AppendEntriesRequest; pub type AppendEntriesResponse = openraft::raft::AppendEntriesResponse; From a708b9079760f557f8ae008837f15280e52df947 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Mon, 17 Mar 2025 11:19:45 +0800 Subject: [PATCH 3/3] chore: create membership with default node --- src/meta/control/src/import.rs | 2 +- .../sm_v003/compact_immutable_levels_test.rs | 23 ++++++++---- .../src/sm_v003/compact_with_db_test.rs | 37 +++++++++++++------ .../raft-store/src/state_machine/testing.rs | 14 ++++--- src/meta/service/tests/it/store.rs | 2 +- 5 files changed, 52 insertions(+), 26 deletions(-) diff --git a/src/meta/control/src/import.rs b/src/meta/control/src/import.rs index 9c758f6ef15bf..f702289408476 100644 --- a/src/meta/control/src/import.rs +++ b/src/meta/control/src/import.rs @@ -317,7 +317,7 @@ async fn init_new_cluster( let mut log_id = last_log_id.unwrap_or(new_log_id(0, 0, 0)); let node_ids = nodes.keys().copied().collect::>(); - let membership = Membership::new(vec![node_ids], ())?; + let membership = Membership::new_with_defaults(vec![node_ids], []); // Update snapshot: Replace nodes set and membership config. { diff --git a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs index 3e5ffb6d34669..44476a2290ae6 100644 --- a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs @@ -51,7 +51,10 @@ async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { assert_eq!(compacted.iter_immutable_levels().count(), 1); assert_eq!( d.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) + &StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []) + ) ); assert_eq!(d.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -203,8 +206,10 @@ async fn build_3_levels() -> anyhow::Result { let mut lm = LeveledMap::default(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(1, 1, 1)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(1, 1, 1)); *sd.nodes_mut() = btreemap! {1=>Node::new("1", Endpoint::new("1", 1))}; @@ -217,8 +222,10 @@ async fn build_3_levels() -> anyhow::Result { lm.freeze_writable(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(2, 2, 2)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(2, 2, 2)); *sd.nodes_mut() = btreemap! {2=>Node::new("2", Endpoint::new("2", 2))}; @@ -230,8 +237,10 @@ async fn build_3_levels() -> anyhow::Result { lm.freeze_writable(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(3, 3, 3)); *sd.nodes_mut() = btreemap! {3=>Node::new("3", Endpoint::new("3", 3))}; diff --git a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs index 2f45db1daf830..2d3302fddf4de 100644 --- a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs @@ -44,7 +44,10 @@ async fn test_leveled_query_with_db() -> anyhow::Result<()> { assert_eq!(lm.curr_seq(), 7); assert_eq!( lm.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) + &StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []) + ) ); assert_eq!(lm.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -93,7 +96,7 @@ async fn test_leveled_query_with_expire_index() -> anyhow::Result<()> { assert_eq!(lm.curr_seq(), 4); assert_eq!( lm.last_membership_ref(), - &StoredMembership::new(None, Membership::new(vec![], ())?) + &StoredMembership::new(None, Membership::new_with_defaults(vec![], [])) ); assert_eq!(lm.last_applied_ref(), &None); assert_eq!(lm.nodes_ref(), &btreemap! {}); @@ -161,7 +164,10 @@ async fn test_compact() -> anyhow::Result<()> { assert_eq!(db.curr_seq(), 7); assert_eq!( db.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?) + &StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []) + ) ); assert_eq!(db.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -210,7 +216,7 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { assert_eq!(db.curr_seq(), 4); assert_eq!( db.last_membership_ref(), - &StoredMembership::new(None, Membership::new(vec![], ())?) + &StoredMembership::new(None, Membership::new_with_defaults(vec![], [])) ); assert_eq!(db.last_applied_ref(), &None); assert_eq!(db.nodes_ref(), &btreemap! {}); @@ -274,7 +280,10 @@ async fn test_compact_output_3_level() -> anyhow::Result<()> { assert_eq!(sys_data.curr_seq(), 7); assert_eq!( sys_data.last_membership_ref(), - &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ()).unwrap()) + &StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []) + ) ); assert_eq!(sys_data.last_applied_ref(), &Some(log_id(3, 3, 3))); assert_eq!( @@ -305,8 +314,10 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { let mut lm = LeveledMap::default(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(1, 1, 1)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(1, 1, 1)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(1, 1, 1)); *sd.nodes_mut() = btreemap! {1=>Node::new("1", Endpoint::new("1", 1))}; @@ -319,8 +330,10 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { lm.freeze_writable(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(2, 2, 2)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(2, 2, 2)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(2, 2, 2)); *sd.nodes_mut() = btreemap! {2=>Node::new("2", Endpoint::new("2", 2))}; @@ -332,8 +345,10 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { lm.freeze_writable(); let sd = lm.writable_mut().sys_data_mut(); - *sd.last_membership_mut() = - StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())?); + *sd.last_membership_mut() = StoredMembership::new( + Some(log_id(3, 3, 3)), + Membership::new_with_defaults(vec![], []), + ); *sd.last_applied_mut() = Some(log_id(3, 3, 3)); *sd.nodes_mut() = btreemap! {3=>Node::new("3", Endpoint::new("3", 3))}; diff --git a/src/meta/raft-store/src/state_machine/testing.rs b/src/meta/raft-store/src/state_machine/testing.rs index 5ca229802ced8..5c9dabfb0ed02 100644 --- a/src/meta/raft-store/src/state_machine/testing.rs +++ b/src/meta/raft-store/src/state_machine/testing.rs @@ -29,9 +29,10 @@ pub fn snapshot_logs() -> (Vec, Vec) { let logs = vec![ Entry { log_id: new_log_id(1, 0, 1), - payload: EntryPayload::Membership( - Membership::new(vec![btreeset![1, 2, 3]], ()).unwrap(), - ), + payload: EntryPayload::Membership(Membership::new_with_defaults( + vec![btreeset![1, 2, 3]], + [], + )), }, Entry::new_blank(new_log_id(1, 0, 2)), Entry::new_blank(new_log_id(1, 0, 3)), @@ -45,9 +46,10 @@ pub fn snapshot_logs() -> (Vec, Vec) { }, Entry { log_id: new_log_id(1, 0, 5), - payload: EntryPayload::Membership( - Membership::new(vec![btreeset![4, 5, 6]], ()).unwrap(), - ), + payload: EntryPayload::Membership(Membership::new_with_defaults( + vec![btreeset![4, 5, 6]], + [], + )), }, Entry { log_id: new_log_id(1, 0, 6), diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index c18a8ccfb3e81..e35881f86799e 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -256,7 +256,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { assert_eq!( StoredMembership::new( Some(log_id(1, 0, 5)), - Membership::new(vec![btreeset! {4,5,6}], ())? + Membership::new_with_defaults(vec![btreeset! {4,5,6}], []) ), mem );