Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/meta/service/src/api/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,19 @@ impl GrpcServer {
.add_service(grpc_srv)
.serve_with_shutdown(addr, async move {
let _ = started_tx.send(());
info!("metasrv starts to wait for stop signal: {}", addr);
info!(
"meta-service gRPC(on {}) starts to wait for stop signal",
addr
);
let _ = stop_rx.await;
info!("metasrv receives stop signal: {}", addr);
info!("meta-service gRPC(on {}) receives stop signal", addr);
})
.await;

info!("grpc task returned res: {:?}", res);
info!(
"meta-service gRPC(on {}) task returned res: {:?}",
addr, res
);
}
.in_span(Span::enter_with_local_parent("spawn-grpc")),
);
Expand Down
7 changes: 5 additions & 2 deletions src/meta/service/src/meta_service/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl MetaNode {
srv.serve_with_shutdown(socket_addr, async move {
let _ = running_rx.changed().await;
info!(
"signal received, shutting down: id={} {} ",
"running_rx for Raft server received, shutting down: id={} {} ",
node_id, ip_port
);
})
Expand Down Expand Up @@ -422,7 +422,10 @@ impl MetaNode {

if let Err(changed_err) = changed {
// Shutting down.
error!("{} when watching metrics_rx", changed_err);
info!(
"{}; when:(watching metrics_rx); quit subscribe_metrics() loop",
changed_err
);
break;
}

Expand Down
212 changes: 112 additions & 100 deletions src/meta/service/tests/it/grpc/metasrv_grpc_kv_api_restart_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::time::Duration;

use databend_common_base::base::Stoppable;
// use databend_common_meta_client::ClientHandle;
// use databend_common_meta_client::MetaGrpcClient;
use databend_common_meta_client::ClientHandle;
use databend_common_meta_client::MetaGrpcClient;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use log::info;
Expand Down Expand Up @@ -116,108 +116,120 @@ async fn test_kv_api_restart_cluster_write_read() -> anyhow::Result<()> {
Ok(())
}

// FIXME: Disable this test until https://github.com/databendlabs/databend/pull/16704/#issuecomment-2442094481 addressed.
/// - Start a cluster of 3.
/// - Test upsert kv and read on different nodes.
/// - Stop and restart the cluster.
/// - Test read kv using same grpc client.
// #[test(harness = meta_service_test_harness)]
// #[fastrace::trace]
// async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
// fn make_key(tc: &MetaSrvTestContext, k: impl std::fmt::Display) -> String {
// let x = &tc.config.raft_config;
// format!("t-restart-cluster-{}-{}-{}", x.config_id, x.id, k)
// }
//
// async fn test_write_read_on_every_node(
// tcs: &[MetaSrvTestContext],
// client: &ClientHandle,
// key_suffix: &str,
// ) -> anyhow::Result<()> {
// info!("--- test write on every node: {}", key_suffix);
//
// for (i, tc) in tcs.iter().enumerate() {
// let k = make_key(tc, key_suffix);
// if i == 0 {
// let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
// info!("--- upsert res: {:?}", res);
// } else {
// let client = tc.grpc_client().await.unwrap();
// let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
// info!("--- upsert res: {:?}", res);
// }
//
// let res = client.get_kv(&k).await?;
// let res = res.unwrap();
//
// assert_eq!(k.into_bytes(), res.data);
// }
//
// Ok(())
// }
//
// let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;
// let client = MetaGrpcClient::try_create(
// vec![tcs[0].config.grpc_api_address.clone()],
// "root",
// "xxx",
// None,
// Some(Duration::from_secs(10)),
// None,
// )?;
//
// info!("--- test write on a fresh cluster");
// let key_suffix = "1st";
// test_write_read_on_every_node(&tcs, &client, key_suffix).await?;
//
// info!("--- shutdown the cluster");
// let stopped_tcs = {
// let mut stopped_tcs = vec![];
// for mut tc in tcs {
// assert!(tc.meta_node.is_none());
//
// let mut srv = tc.grpc_srv.take().unwrap();
// srv.stop(None).await?;
//
// stopped_tcs.push(tc);
// }
// stopped_tcs
// };
//
// info!("--- restart the cluster");
// let tcs = {
// let mut tcs = vec![];
// for mut tc in stopped_tcs {
// start_metasrv_with_context(&mut tc).await?;
// tcs.push(tc);
// }
//
// for tc in &tcs {
// info!("--- wait until a leader is observed");
// // Every tcs[i] contains one meta node in this context.
// let g = tc.grpc_srv.as_ref().unwrap();
// let meta_node = g.get_meta_node();
// let metrics = meta_node
// .raft
// .wait(timeout())
// .metrics(|m| m.current_leader.is_some(), "a leader is observed")
// .await?;
//
// info!("got leader, metrics: {:?}", metrics);
// }
// tcs
// };
//
// info!("--- read use old client");
// let tc = &tcs[0];
// let k = make_key(tc, key_suffix);
// let res = client.get_kv(&k).await?;
// let res = res.unwrap();
//
// assert_eq!(b(k), res.data);
//
// Ok(())
// }
#[test(harness = meta_service_test_harness)]
#[fastrace::trace]
async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
fn make_key(tc: &MetaSrvTestContext, k: impl std::fmt::Display) -> String {
let x = &tc.config.raft_config;
format!("t-restart-cluster-{}-{}-{}", x.config_id, x.id, k)
}

async fn test_write_read_on_every_node(
tcs: &[MetaSrvTestContext],
client: &ClientHandle,
key_suffix: &str,
) -> anyhow::Result<()> {
info!("--- test write on every node: {}", key_suffix);

for (i, tc) in tcs.iter().enumerate() {
let k = make_key(tc, key_suffix);
if i == 0 {
let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
info!("--- upsert res: {:?}", res);
} else {
let client = tc.grpc_client().await.unwrap();
let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
info!("--- upsert res: {:?}", res);
}

let res = client.get_kv(&k).await?;
let res = res.unwrap();

assert_eq!(k.into_bytes(), res.data);
}

Ok(())
}

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;
let client = MetaGrpcClient::try_create(
vec![tcs[0].config.grpc_api_address.clone()],
"root",
"xxx",
// Without timeout, the client will not be able to reconnect.
// This is an issue of the http client.
Some(Duration::from_secs(1)),
Some(Duration::from_secs(10)),
None,
)?;

info!("--- test write on a fresh cluster");
let key_suffix = "1st";
test_write_read_on_every_node(&tcs, &client, key_suffix).await?;

info!("--- shutdown the cluster");
let stopped_tcs = {
let mut stopped_tcs = vec![];
for mut tc in tcs {
assert!(tc.meta_node.is_none());

let mut srv = tc.grpc_srv.take().unwrap();
srv.stop(None).await?;

stopped_tcs.push(tc);
}
stopped_tcs
};

info!("--- restart the cluster");
let tcs = {
let mut tcs = vec![];
for mut tc in stopped_tcs {
info!(
"--- starting metasrv: {:?}",
tc.config.raft_config.raft_api_addr().await?
);
start_metasrv_with_context(&mut tc).await?;

info!(
"--- started metasrv: {:?}",
tc.config.raft_config.raft_api_addr().await?
);

// sleep(Duration::from_secs(3)).await;
tcs.push(tc);
}

for tc in &tcs {
info!("--- wait until a leader is observed");
// Every tcs[i] contains one meta node in this context.
let g = tc.grpc_srv.as_ref().unwrap();
let meta_node = g.get_meta_node();
let metrics = meta_node
.raft
.wait(timeout())
.metrics(|m| m.current_leader.is_some(), "a leader is observed")
.await?;

info!("got leader, metrics: {:?}", metrics);
}
tcs
};

info!("--- read use old client");
let tc = &tcs[0];
let k = make_key(tc, key_suffix);
let res = client.get_kv(&k).await?;
let res = res.unwrap();

assert_eq!(b(k), res.data);

Ok(())
}

// Election timeout is 8~12 sec.
// A raft node waits for a interval of election timeout before starting election
Expand Down
Loading