Skip to content

Commit 2e180e7

Browse files
committed
Provide all resolved IPs to mysql, postgres, and ssh clients
1 parent 90ab9a4 commit 2e180e7

File tree

12 files changed

+162
-133
lines changed

12 files changed

+162
-133
lines changed

Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres" }
192192
postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" }
193193
postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" }
194194

195+
# Waiting on https://github.com/blackbeam/mysql_async/pull/300
196+
mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d" }
197+
195198
# Waiting on https://github.com/MaterializeInc/serde-value/pull/35.
196199
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }
197200

src/kafka-util/src/client.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use anyhow::{anyhow, Context};
2222
use crossbeam::channel::{unbounded, Receiver, Sender};
2323
use mz_ore::collections::CollectionExt;
2424
use mz_ore::error::ErrorExt;
25-
use mz_ore::netio::resolve_external_address;
25+
use mz_ore::netio::resolve_address;
2626
use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus};
2727
use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
2828
use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken};
@@ -297,7 +297,7 @@ enum BrokerRewriteHandle {
297297
FailedDefaultSshTunnel(String),
298298
/// We store an error if DNS resolution fails when resolving
299299
/// a new broker host.
300-
FailedDNSResolution(String),
300+
FailedDnsResolution(String),
301301
}
302302

303303
/// Tunneling clients
@@ -425,7 +425,7 @@ impl<C> TunnelingClientContext<C> {
425425
BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
426426
SshTunnelStatus::Errored(e.clone())
427427
}
428-
BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => {
428+
BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDnsResolution(_) => {
429429
SshTunnelStatus::Running
430430
}
431431
})
@@ -447,7 +447,7 @@ impl<C> TunnelingClientContext<C> {
447447
let broker_status = rewrites
448448
.values()
449449
.map(|handle| match handle {
450-
BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()),
450+
BrokerRewriteHandle::FailedDnsResolution(e) => BrokerStatus::Failed(e.clone()),
451451
_ => BrokerStatus::Nominal,
452452
})
453453
.fold(BrokerStatus::Nominal, |acc, status| match (acc, status) {
@@ -486,7 +486,7 @@ where
486486
}
487487
}
488488
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
489-
| BrokerRewriteHandle::FailedDNSResolution(_) => {
489+
| BrokerRewriteHandle::FailedDnsResolution(_) => {
490490
unreachable!()
491491
}
492492
};
@@ -510,28 +510,18 @@ where
510510
match rewrite {
511511
None
512512
| Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_))
513-
| Some(BrokerRewriteHandle::FailedDNSResolution(_)) => {
513+
| Some(BrokerRewriteHandle::FailedDnsResolution(_)) => {
514514
match &self.default_tunnel {
515515
TunnelConfig::Ssh(default_tunnel) => {
516516
// Multiple users could all run `connect` at the same time; only one ssh
517517
// tunnel will ever be connected, and only one will be inserted into the
518518
// map.
519519
let ssh_tunnel = self.runtime.block_on(async {
520-
// Ensure the default tunnel host is resolved to an external address.
521-
let resolved_tunnel_addr = resolve_external_address(
522-
&default_tunnel.host,
523-
self.enforce_external_addresses,
524-
)
525-
.await?;
526-
let tunnel_config = SshTunnelConfig {
527-
host: resolved_tunnel_addr.to_string(),
528-
port: default_tunnel.port,
529-
user: default_tunnel.user.clone(),
530-
key_pair: default_tunnel.key_pair.clone(),
531-
};
532520
self.ssh_tunnel_manager
533521
.connect(
534-
tunnel_config,
522+
// We know the default_tunnel has already been validated by the `resolve_address`
523+
// method when it was provided to the client, so we don't need to check it again.
524+
default_tunnel.clone(),
535525
&addr.host,
536526
addr.port.parse().unwrap(),
537527
self.ssh_timeout_config,
@@ -546,7 +536,7 @@ where
546536
if matches!(
547537
o.get(),
548538
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
549-
| BrokerRewriteHandle::FailedDNSResolution(_)
539+
| BrokerRewriteHandle::FailedDnsResolution(_)
550540
) =>
551541
{
552542
o.insert(BrokerRewriteHandle::SshTunnel(
@@ -594,15 +584,14 @@ where
594584
// If no rewrite is specified, we still should check that this potentially
595585
// new broker address is a global address.
596586
self.runtime.block_on(async {
597-
match resolve_external_address(
598-
&addr.host,
599-
self.enforce_external_addresses,
600-
)
601-
.await
587+
match resolve_address(&addr.host, self.enforce_external_addresses).await
602588
{
603589
Ok(resolved) => {
604590
let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
605-
host: resolved.to_string(),
591+
// `resolve_address` will always return at least one address.
592+
// TODO: Once we have a way to provide multiple hosts to rdkafka, we should
593+
// return all resolved addresses here.
594+
host: resolved.first().unwrap().to_string(),
606595
port: addr.port.parse().ok(),
607596
});
608597
return_rewrite(&rewrite)
@@ -616,7 +605,7 @@ where
616605
// Write an error if no one else has already written one.
617606
let mut rewrites = self.rewrites.lock().expect("poisoned");
618607
rewrites.entry(addr.clone()).or_insert_with(|| {
619-
BrokerRewriteHandle::FailedDNSResolution(
608+
BrokerRewriteHandle::FailedDnsResolution(
620609
e.to_string_with_causes(),
621610
)
622611
});

src/mysql-util/src/tunnel.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
use std::net::IpAddr;
1011
use std::ops::{Deref, DerefMut};
1112
use std::time::Duration;
1213

@@ -25,7 +26,7 @@ use crate::MySqlError;
2526
#[derive(Debug, PartialEq, Clone)]
2627
pub enum TunnelConfig {
2728
/// Establish a direct TCP connection to the database host.
28-
Direct { tcp_host_override: Option<String> },
29+
Direct { resolved_ips: Option<Vec<IpAddr>> },
2930
/// Establish a TCP connection to the database via an SSH tunnel.
3031
/// This means first establishing an SSH connection to a bastion host,
3132
/// and then opening a separate connection from that host to the database.
@@ -225,27 +226,9 @@ impl Config {
225226
ssh_tunnel_manager: &SshTunnelManager,
226227
) -> Result<MySqlConn, MySqlError> {
227228
match &self.tunnel {
228-
TunnelConfig::Direct { tcp_host_override } => {
229-
// Override the connection host for the actual TCP connection to point to
230-
// the privatelink hostname instead.
231-
let mut opts_builder = OptsBuilder::from_opts(self.inner.clone());
232-
233-
if let Some(tcp_override) = tcp_host_override {
234-
opts_builder = opts_builder.ip_or_hostname(tcp_override);
235-
236-
if let Some(ssl_opts) = self.inner.ssl_opts() {
237-
if !ssl_opts.skip_domain_validation() {
238-
// If the TLS configuration will validate the hostname, we need to set
239-
// the TLS hostname back to the actual upstream host and not the
240-
// TCP hostname.
241-
opts_builder = opts_builder.ssl_opts(Some(
242-
ssl_opts.clone().with_danger_tls_hostname_override(Some(
243-
self.inner.ip_or_hostname().to_string(),
244-
)),
245-
));
246-
}
247-
}
248-
};
229+
TunnelConfig::Direct { resolved_ips } => {
230+
let opts_builder =
231+
OptsBuilder::from_opts(self.inner.clone()).resolved_ips(resolved_ips.clone());
249232

250233
Ok(MySqlConn {
251234
conn: Conn::new(opts_builder).await.map_err(MySqlError::from)?,

src/ore/src/netio/dns.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ const DUMMY_PORT: u16 = 11111;
2222

2323
/// Resolves a host address and ensures it is a global address when `enforce_global` is set.
2424
/// This parameter is useful when connecting to user-defined unverified addresses.
25-
pub async fn resolve_external_address(
25+
pub async fn resolve_address(
2626
mut host: &str,
2727
enforce_global: bool,
28-
) -> Result<IpAddr, io::Error> {
28+
) -> Result<Vec<IpAddr>, io::Error> {
2929
// `net::lookup_host` requires a port to be specified, but we don't care about the port.
3030
let mut port = DUMMY_PORT;
3131
// If a port is already specified, use it and remove it from the host.
@@ -37,23 +37,26 @@ pub async fn resolve_external_address(
3737
}
3838

3939
let mut addrs = lookup_host((host, port)).await?;
40-
41-
if let Some(addr) = addrs.next() {
40+
let mut ips = Vec::new();
41+
while let Some(addr) = addrs.next() {
4242
let ip = addr.ip();
4343
if enforce_global && !is_global(ip) {
4444
Err(io::Error::new(
4545
io::ErrorKind::AddrNotAvailable,
4646
"address is not global",
47-
))
47+
))?
4848
} else {
49-
Ok(ip)
49+
ips.push(ip);
5050
}
51-
} else {
51+
}
52+
53+
if ips.len() == 0 {
5254
Err(io::Error::new(
5355
io::ErrorKind::AddrNotAvailable,
5456
"no addresses found",
55-
))
57+
))?
5658
}
59+
Ok(ips)
5760
}
5861

5962
fn is_global(addr: IpAddr) -> bool {

src/ore/src/netio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod read_exact;
2222
mod socket;
2323

2424
pub use crate::netio::async_ready::AsyncReady;
25-
pub use crate::netio::dns::resolve_external_address;
25+
pub use crate::netio::dns::resolve_address;
2626
pub use crate::netio::framed::{FrameTooBig, MAX_FRAME_SIZE};
2727
pub use crate::netio::read_exact::{read_exact_or_eof, ReadExactOrEof};
2828
pub use crate::netio::socket::{Listener, SocketAddr, SocketAddrType, Stream, UnixSocketAddr};

src/postgres-util/src/tunnel.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
use std::net::IpAddr;
1011
use std::time::Duration;
1112

1213
use mz_ore::option::OptionExt;
@@ -37,7 +38,7 @@ macro_rules! bail_generic {
3738
#[derive(Debug, PartialEq, Clone)]
3839
pub enum TunnelConfig {
3940
/// Establish a direct TCP connection to the database host.
40-
Direct { tcp_host_override: Option<String> },
41+
Direct { resolved_ips: Option<Vec<IpAddr>> },
4142
/// Establish a TCP connection to the database via an SSH tunnel.
4243
/// This means first establishing an SSH connection to a bastion host,
4344
/// and then opening a separate connection from that host to the database.
@@ -222,19 +223,22 @@ impl Config {
222223
})?;
223224

224225
match &self.tunnel {
225-
TunnelConfig::Direct { tcp_host_override } => {
226-
// Override the TCP host we connect to, leaving the host used for TLS verification
227-
// as the original host
228-
if let Some(tcp_override) = tcp_host_override {
229-
let (host, _) = self.address()?;
230-
postgres_config.tls_verify_host(host);
231-
232-
match postgres_config.get_hosts_mut() {
233-
[Host::Tcp(host)] => *host = tcp_override.clone(),
226+
TunnelConfig::Direct { resolved_ips } => {
227+
if let Some(ips) = resolved_ips {
228+
let host = match postgres_config.get_hosts() {
229+
[Host::Tcp(host)] => host,
234230
_ => bail_generic!(
235231
"only TCP connections to a single PostgreSQL server are supported"
236232
),
237233
}
234+
.to_owned();
235+
// The number of 'host' and 'hostaddr' values must be the same.
236+
for (idx, ip) in ips.iter().enumerate() {
237+
if idx != 0 {
238+
postgres_config.host(&host);
239+
}
240+
postgres_config.hostaddr(ip.clone());
241+
}
238242
};
239243

240244
let (client, connection) = postgres_config.connect(tls).await?;
@@ -276,17 +280,25 @@ impl Config {
276280
// `tokio_postgres::Config` to do this is somewhat confusing, and requires we edit
277281
// the singular host in place.
278282

279-
let (host, _) = self.address()?;
280-
postgres_config.tls_verify_host(host);
281-
282283
let privatelink_host = mz_cloud_resources::vpc_endpoint_name(*connection_id);
284+
let privatelink_addrs = tokio::net::lookup_host(privatelink_host).await?;
283285

284-
match postgres_config.get_hosts_mut() {
285-
[Host::Tcp(host)] => *host = privatelink_host,
286+
// Override the actual IPs to connect to for the TCP connection, leaving the original host in-place
287+
// for TLS verification
288+
let host = match postgres_config.get_hosts() {
289+
[Host::Tcp(host)] => host,
286290
_ => bail_generic!(
287291
"only TCP connections to a single PostgreSQL server are supported"
288292
),
289293
}
294+
.to_owned();
295+
// The number of 'host' and 'hostaddr' values must be the same.
296+
for (idx, addr) in privatelink_addrs.enumerate() {
297+
if idx != 0 {
298+
postgres_config.host(&host);
299+
}
300+
postgres_config.hostaddr(addr.ip());
301+
}
290302

291303
let (client, connection) = postgres_config.connect(tls).await?;
292304
task::spawn(|| task_name, connection);

0 commit comments

Comments
 (0)