Skip to content

Commit e9a6a53

Browse files
committed
IT: Add test for schema agreement with paused node
1 parent 18a2ce6 commit e9a6a53

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed

scylla/tests/integration/session/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod history;
55
mod new_session;
66
mod pager;
77
mod retries;
8+
mod schema_agreement;
89
mod self_identity;
910
mod tracing;
1011
mod use_keyspace;
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use std::sync::Arc;
2+
3+
use assert_matches::assert_matches;
4+
use scylla::client::session::Session;
5+
use scylla::client::session_builder::SessionBuilder;
6+
use scylla::errors::{ExecutionError, RequestAttemptError, SchemaAgreementError};
7+
use scylla::policies::load_balancing::{NodeIdentifier, SingleTargetLoadBalancingPolicy};
8+
use scylla::response::query_result::QueryResult;
9+
use scylla::statement::Statement;
10+
use scylla_proxy::{
11+
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, RunningProxy,
12+
ShardAwareness, WorkerError,
13+
};
14+
15+
use crate::utils::{
16+
calculate_proxy_host_ids, setup_tracing, test_with_3_node_cluster, unique_keyspace_name,
17+
};
18+
19+
async fn run_some_ddl_with_unreachable_node(
20+
coordinator: NodeIdentifier,
21+
paused: usize,
22+
session: &Session,
23+
running_proxy: &mut RunningProxy,
24+
) -> Result<QueryResult, ExecutionError> {
25+
// Prevent fetching schema version.
26+
// It simulates a node that became unreachable after our DDL completed,
27+
// but the pool in the driver is not yet `Broken`.
28+
running_proxy.running_nodes[paused].change_request_rules(Some(vec![RequestRule(
29+
Condition::and(
30+
Condition::not(Condition::ConnectionRegisteredAnyEvent),
31+
Condition::and(
32+
Condition::RequestOpcode(RequestOpcode::Query),
33+
Condition::BodyContainsCaseSensitive(Box::new(*b"system.local")),
34+
),
35+
),
36+
// Simulates driver discovering that node is unreachable.
37+
RequestReaction::drop_connection(),
38+
)]));
39+
40+
let ks = unique_keyspace_name();
41+
let mut request = Statement::new(format!("CREATE KEYSPACE {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"));
42+
request.set_load_balancing_policy(Some(SingleTargetLoadBalancingPolicy::new(
43+
coordinator,
44+
None,
45+
)));
46+
47+
let result = session.query_unpaged(request, &[]).await;
48+
49+
// Cleanup
50+
running_proxy.running_nodes[paused].change_request_rules(Some(vec![]));
51+
session
52+
.query_unpaged(format!("DROP KEYSPACE {ks}"), &[])
53+
.await
54+
.unwrap();
55+
56+
result
57+
}
58+
59+
// Verifies that auto schema agreement (performed after receiving response of DDL request) works correctly
60+
// when a node is paused.
61+
#[tokio::test]
62+
#[cfg_attr(scylla_cloud_tests, ignore)]
63+
async fn test_schema_await_with_paused_node() {
64+
setup_tracing();
65+
66+
let res = test_with_3_node_cluster(
67+
ShardAwareness::QueryNode,
68+
|proxy_uris, translation_map, mut running_proxy| async move {
69+
// DB preparation phase
70+
let session: Session = SessionBuilder::new()
71+
.known_node(proxy_uris[0].as_str())
72+
.address_translator(Arc::new(translation_map.clone()))
73+
.build()
74+
.await
75+
.unwrap();
76+
77+
let host_ids = calculate_proxy_host_ids(&proxy_uris, &translation_map, &session);
78+
79+
{
80+
// Case 1: Paused node is a coordinator for DDL.
81+
// DDL needs to fail.
82+
let result = run_some_ddl_with_unreachable_node(
83+
NodeIdentifier::HostId(host_ids[1]),
84+
1,
85+
&session,
86+
&mut running_proxy,
87+
)
88+
.await;
89+
assert_matches!(
90+
result,
91+
Err(ExecutionError::SchemaAgreementError(
92+
SchemaAgreementError::RequestError(
93+
RequestAttemptError::BrokenConnectionError(_)
94+
)
95+
))
96+
)
97+
}
98+
99+
{
100+
// Case 2: Paused node is NOT a coordinator for DDL.
101+
// DDL should succeed, because auto schema agreement only needs available nodes to agree.
102+
let result = run_some_ddl_with_unreachable_node(
103+
NodeIdentifier::HostId(host_ids[2]),
104+
1,
105+
&session,
106+
&mut running_proxy,
107+
)
108+
.await;
109+
assert_matches!(result, Ok(_))
110+
}
111+
112+
{
113+
// Case 3: Paused node is a coordinator for DDL, and is used by control connection.
114+
// It is the same as case 1, but paused node is also control connection.
115+
// DDL needs to fail.
116+
let result = run_some_ddl_with_unreachable_node(
117+
NodeIdentifier::HostId(host_ids[0]),
118+
0,
119+
&session,
120+
&mut running_proxy,
121+
)
122+
.await;
123+
assert_matches!(
124+
result,
125+
Err(ExecutionError::SchemaAgreementError(
126+
SchemaAgreementError::RequestError(
127+
RequestAttemptError::BrokenConnectionError(_)
128+
)
129+
))
130+
)
131+
}
132+
133+
{
134+
// Case 4: Paused node is NOT a coordinator for DDL, but is used by control connection.
135+
// It is the same as case 2, but paused node is also control connection.
136+
// DDL should succeed, because auto schema agreement only needs available nodes to agree,
137+
// and control connection is not used for that at all.
138+
let result = run_some_ddl_with_unreachable_node(
139+
NodeIdentifier::HostId(host_ids[1]),
140+
0,
141+
&session,
142+
&mut running_proxy,
143+
)
144+
.await;
145+
assert_matches!(result, Ok(_))
146+
}
147+
148+
running_proxy
149+
},
150+
)
151+
.await;
152+
153+
match res {
154+
Ok(()) => (),
155+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
156+
Err(err) => panic!("{}", err),
157+
}
158+
}

scylla/tests/integration/utils.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
2121
use std::sync::Arc;
2222
use std::time::{Duration, SystemTime, UNIX_EPOCH};
2323
use tracing::{error, warn};
24+
use uuid::Uuid;
2425

2526
use scylla_proxy::{Node, Proxy, ProxyError, RunningProxy, ShardAwareness};
2627

@@ -320,3 +321,51 @@ impl PerformDDL for CachingSession {
320321
self.execute_unpaged(query, &[]).await.map(|_| ())
321322
}
322323
}
324+
325+
/// Calculates a list of nodes host ids, in the same order as passed proxy_uris.
326+
/// Useful if a test wants to set rules on some node, and then send requests to this node.
327+
pub(crate) fn calculate_proxy_host_ids(
328+
proxy_uris: &[String],
329+
translation_map: &HashMap<SocketAddr, SocketAddr>,
330+
session: &Session,
331+
) -> Vec<Uuid> {
332+
let proxy_ips: Vec<IpAddr> = proxy_uris
333+
.iter()
334+
.map(|uri| uri.as_str().parse::<SocketAddr>().unwrap().ip())
335+
.collect::<Vec<_>>();
336+
337+
let real_node_ips: Vec<IpAddr> = {
338+
let reversed_translation_map = translation_map
339+
.iter()
340+
.map(|(a, b)| (b.ip(), a.ip()))
341+
.collect::<HashMap<_, _>>();
342+
343+
proxy_uris
344+
.iter()
345+
.map(|uri| {
346+
*reversed_translation_map
347+
.get(&uri.as_str().parse::<SocketAddr>().unwrap().ip())
348+
.unwrap()
349+
})
350+
.collect::<Vec<_>>()
351+
};
352+
assert_eq!(proxy_ips.len(), real_node_ips.len());
353+
354+
let state = session.get_cluster_state();
355+
let nodes = state.get_nodes_info();
356+
357+
let host_ids: Vec<Uuid> = proxy_ips
358+
.into_iter()
359+
.zip(real_node_ips)
360+
.map(|(proxy_ip, real_ip)| {
361+
let node = nodes
362+
.iter()
363+
.find(|n| n.address.ip() == proxy_ip || n.address.ip() == real_ip)
364+
.unwrap();
365+
node.host_id
366+
})
367+
.collect();
368+
369+
assert_eq!(host_ids.len(), proxy_uris.len());
370+
host_ids
371+
}

0 commit comments

Comments
 (0)