diff --git a/Cargo.lock b/Cargo.lock index ae4c436e5c7f6..a6b1da6169b3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3476,8 +3476,7 @@ dependencies = [ [[package]] name = "mysql_async" version = "0.34.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbfe87d7e35cb72363326216cc1712b865d8d4f70abf3b2d2e6b251fb6b2f427" +source = "git+https://github.com/MaterializeInc/mysql_async?rev=6afd2136181f2ec93b7ca7de524f6d02b6f10c1d#6afd2136181f2ec93b7ca7de524f6d02b6f10c1d" dependencies = [ "bytes", "crossbeam", @@ -5812,6 +5811,7 @@ version = "0.0.0" dependencies = [ "anyhow", "futures", + "itertools", "mz-ore", "openssh", "openssh-mux-client", @@ -7040,7 +7040,7 @@ checksum = "15eb2c6e362923af47e13c23ca5afb859e83d54452c55b0b9ac763b8f7c1ac16" [[package]] name = "postgres" version = "0.19.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "bytes", "fallible-iterator", @@ -7053,7 +7053,7 @@ dependencies = [ [[package]] name = "postgres-openssl" version = "0.5.0" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "openssl", "tokio", @@ -7064,7 +7064,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "base64 0.21.5", "byteorder", @@ -7081,7 +7081,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "bytes", "chrono", @@ -9113,7 +9113,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.8" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "async-trait", "byteorder", @@ -9128,6 +9128,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", + "rand", "serde", "socket2 0.5.3", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 5680cf5963ad3..0716a0ee67494 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -189,6 +189,9 @@ postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" } +# Waiting on https://github.com/blackbeam/mysql_async/pull/300 +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d" } + # Waiting on https://github.com/MaterializeInc/serde-value/pull/35. serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } diff --git a/misc/cargo-vet/audits.toml b/misc/cargo-vet/audits.toml index 44dd334f029ab..c17bcf3c36298 100644 --- a/misc/cargo-vet/audits.toml +++ b/misc/cargo-vet/audits.toml @@ -187,6 +187,26 @@ who = "Gus Wynn " criteria = "maintained-and-necessary" version = "4.2.0" +[[audits.postgres]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.19.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-openssl]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.5.0@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-protocol]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.6.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-types]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.2.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + [[audits.quanta]] who = "Roshan Jobanputra " criteria = "safe-to-deploy" @@ -292,6 +312,11 @@ who = "Moritz Hoffmann " criteria = "safe-to-deploy" version = "0.12.0@git:46b28dc48bf5ed26dc0a0d5dbbd53c7964526f27" +[[audits.tokio-postgres]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.7.8@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + [[audits.tracing-capture]] who = "Matt Jibson " criteria = "safe-to-deploy" diff --git a/misc/cargo-vet/config.toml b/misc/cargo-vet/config.toml index ca9b63c37d1ce..5d8b05f349f09 100644 --- a/misc/cargo-vet/config.toml +++ b/misc/cargo-vet/config.toml @@ -1010,22 +1010,6 @@ criteria = "safe-to-deploy" version = "0.3.15" criteria = "safe-to-deploy" -[[exemptions.postgres]] -version = "0.19.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-openssl]] -version = "0.5.0@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-protocol]] -version = "0.6.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-types]] -version = "0.2.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - [[exemptions.postgres_array]] version = "0.11.0@git:f58d0101e5198e04e8692629018d9b58f8543534" criteria = "safe-to-deploy" @@ -1514,10 +1498,6 @@ criteria = "safe-to-deploy" version = "0.2.12" criteria = "safe-to-deploy" -[[exemptions.tokio-postgres]] -version = "0.7.8@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - [[exemptions.tokio-tungstenite]] version = "0.20.0" criteria = "safe-to-deploy" diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index dc2ae7e3332dd..09a91bd89a747 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; +use std::net::IpAddr; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -27,7 +29,11 @@ use crate::MySqlError; #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct, + /// If `resolved_ips` is not None, the provided IPs will be used + /// rather than resolving the hostname. + Direct { + resolved_ips: Option>, + }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -237,12 +243,18 @@ impl Config { ssh_tunnel_manager: &SshTunnelManager, ) -> Result { match &self.tunnel { - TunnelConfig::Direct => Ok(MySqlConn { - conn: Conn::new(self.inner.clone()) - .await - .map_err(MySqlError::from)?, - _ssh_tunnel_handle: None, - }), + TunnelConfig::Direct { resolved_ips } => { + let opts_builder = OptsBuilder::from_opts(self.inner.clone()).resolved_ips( + resolved_ips + .clone() + .map(|ips| ips.into_iter().collect::>()), + ); + + Ok(MySqlConn { + conn: Conn::new(opts_builder).await.map_err(MySqlError::from)?, + _ssh_tunnel_handle: None, + }) + } TunnelConfig::Ssh { config } => { let (host, port) = self.address(); let tunnel = ssh_tunnel_manager diff --git a/src/ore/src/netio.rs b/src/ore/src/netio.rs index 3537e7eb54e5e..e1268a7155a3c 100644 --- a/src/ore/src/netio.rs +++ b/src/ore/src/netio.rs @@ -16,11 +16,13 @@ //! Network I/O utilities. mod async_ready; +mod dns; mod framed; mod read_exact; mod socket; pub use crate::netio::async_ready::AsyncReady; +pub use crate::netio::dns::resolve_address; pub use crate::netio::framed::{FrameTooBig, MAX_FRAME_SIZE}; pub use crate::netio::read_exact::{read_exact_or_eof, ReadExactOrEof}; pub use crate::netio::socket::{Listener, SocketAddr, SocketAddrType, Stream, UnixSocketAddr}; diff --git a/src/ore/src/netio/dns.rs b/src/ore/src/netio/dns.rs new file mode 100644 index 0000000000000..588018431a5cf --- /dev/null +++ b/src/ore/src/netio/dns.rs @@ -0,0 +1,71 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::io; +use std::net::IpAddr; + +use tokio::net::lookup_host; + +const DUMMY_PORT: u16 = 11111; + +/// Resolves a host address and ensures it is a global address when `enforce_global` is set. +/// This parameter is useful when connecting to user-defined unverified addresses. +pub async fn resolve_address( + mut host: &str, + enforce_global: bool, +) -> Result, io::Error> { + // `net::lookup_host` requires a port to be specified, but we don't care about the port. + let mut port = DUMMY_PORT; + // If a port is already specified, use it and remove it from the host. + if let Some(idx) = host.find(':') { + if let Ok(p) = host[idx + 1..].parse() { + port = p; + host = &host[..idx]; + } + } + + let mut addrs = lookup_host((host, port)).await?; + let mut ips = BTreeSet::new(); + while let Some(addr) = addrs.next() { + let ip = addr.ip(); + if enforce_global && !is_global(ip) { + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "address is not global", + ))? + } else { + ips.insert(ip); + } + } + + if ips.len() == 0 { + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "no addresses found", + ))? + } + Ok(ips) +} + +fn is_global(addr: IpAddr) -> bool { + // TODO: Switch to `addr.is_global()` once stable: https://github.com/rust-lang/rust/issues/27709 + match addr { + IpAddr::V4(ip) => { + !(ip.is_unspecified() || ip.is_private() || ip.is_loopback() || ip.is_link_local()) + } + IpAddr::V6(ip) => !(ip.is_loopback() || ip.is_unspecified()), + } +} diff --git a/src/postgres-util/src/tunnel.rs b/src/postgres-util/src/tunnel.rs index 57ebfdba52f52..42f1aa81ab51d 100644 --- a/src/postgres-util/src/tunnel.rs +++ b/src/postgres-util/src/tunnel.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; +use std::net::IpAddr; use std::time::Duration; use mz_ore::future::{InTask, OreFutureExt}; @@ -38,7 +40,11 @@ macro_rules! bail_generic { #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct, + /// If `resolved_ips` is not None, the provided IPs will be used + /// rather than resolving the hostname. + Direct { + resolved_ips: Option>, + }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -226,7 +232,26 @@ impl Config { })?; match &self.tunnel { - TunnelConfig::Direct => { + TunnelConfig::Direct { resolved_ips } => { + if let Some(ips) = resolved_ips { + let host = match postgres_config.get_hosts() { + [Host::Tcp(host)] => host, + _ => bail_generic!( + "only TCP connections to a single PostgreSQL server are supported" + ), + } + .to_owned(); + // Associate each resolved ip with the exact same, singular host, for tls + // verification. We are required to do this dance because `tokio-postgres` + // enforces that the number of 'host' and 'hostaddr' values must be the same. + for (idx, ip) in ips.iter().enumerate() { + if idx != 0 { + postgres_config.host(&host); + } + postgres_config.hostaddr(ip.clone()); + } + }; + let (client, connection) = async move { postgres_config.connect(tls).await } .run_in_task_if(self.in_task, || "pg_connect".to_string()) .await?; @@ -277,17 +302,27 @@ impl Config { // `tokio_postgres::Config` to do this is somewhat confusing, and requires we edit // the singular host in place. - let (host, _) = self.address()?; - postgres_config.tls_verify_host(host); - let privatelink_host = mz_cloud_resources::vpc_endpoint_name(*connection_id); + let privatelink_addrs = tokio::net::lookup_host(privatelink_host).await?; - match postgres_config.get_hosts_mut() { - [Host::Tcp(host)] => *host = privatelink_host, + // Override the actual IPs to connect to for the TCP connection, leaving the original host in-place + // for TLS verification + let host = match postgres_config.get_hosts() { + [Host::Tcp(host)] => host, _ => bail_generic!( "only TCP connections to a single PostgreSQL server are supported" ), } + .to_owned(); + // Associate each resolved ip with the exact same, singular host, for tls + // verification. We are required to do this dance because `tokio-postgres` + // enforces that the number of 'host' and 'hostaddr' values must be the same. + for (idx, addr) in privatelink_addrs.enumerate() { + if idx != 0 { + postgres_config.host(&host); + } + postgres_config.hostaddr(addr.ip()); + } let (client, connection) = async move { postgres_config.connect(tls).await } .run_in_task_if(self.in_task, || "pg_connect".to_string()) diff --git a/src/ssh-util/Cargo.toml b/src/ssh-util/Cargo.toml index b9fa732528a56..234899bf46f4e 100644 --- a/src/ssh-util/Cargo.toml +++ b/src/ssh-util/Cargo.toml @@ -17,6 +17,7 @@ openssh-mux-client = "0.15.5" openssl = { version = "0.10.48", features = ["vendored"] } rand = "0.8.5" futures = "0.3.25" +itertools = "0.10.5" scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.89" } diff --git a/src/ssh-util/src/tunnel.rs b/src/ssh-util/src/tunnel.rs index 0c9e9ac7ed79d..ea9f237d36b3c 100644 --- a/src/ssh-util/src/tunnel.rs +++ b/src/ssh-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::fmt; use std::fs::{self, File}; use std::io::Write; @@ -17,6 +18,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::bail; +use itertools::Itertools; use mz_ore::error::ErrorExt; use mz_ore::task::{self, AbortOnDropHandle}; use openssh::{ForwardType, Session}; @@ -64,8 +66,9 @@ impl Default for SshTimeoutConfig { /// Specifies an SSH tunnel. #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct SshTunnelConfig { - /// The hostname of the SSH bastion server. - pub host: String, + /// The hostname/IP of the SSH bastion server. + /// If multiple hosts are specified, they are tried in order. + pub host: BTreeSet, /// The port to connect to. pub port: u16, /// The name of the user to connect as. @@ -85,6 +88,18 @@ impl fmt::Debug for SshTunnelConfig { } } +impl fmt::Display for SshTunnelConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}@{}:{}", + self.user, + self.host.iter().join(","), + self.port + ) + } +} + /// The status of a running SSH tunnel. #[derive(Clone, Debug)] pub enum SshTunnelStatus { @@ -106,10 +121,7 @@ impl SshTunnelConfig { remote_port: u16, timeout_config: SshTimeoutConfig, ) -> Result { - let tunnel_id = format!( - "{}:{} via {}@{}:{}", - remote_host, remote_port, self.user, self.host, self.port, - ); + let tunnel_id = format!("{}:{} via {}", remote_host, remote_port, self); // N.B. // @@ -236,31 +248,44 @@ async fn connect( // Mostly helpful to ensure the file is not accidentally overwritten. tempfile.set_permissions(std::fs::Permissions::from_mode(0o400))?; - // Bastion hosts (and therefore keys) tend to change, so we don't want - // to lock ourselves into trusting only the first we see. In any case, - // recording a known host would only last as long as the life of a - // storage pod, so it doesn't offer any protection. - let session = openssh::SessionBuilder::default() - .known_hosts_check(openssh::KnownHosts::Accept) - .user_known_hosts_file("/dev/null") - .user(config.user.clone()) - .port(config.port) - .keyfile(&path) - .server_alive_interval(timeout_config.keepalives_idle) - .connect_timeout(timeout_config.connect_timeout) - .connect_mux(config.host.clone()) - .await?; - - // Delete the private key for safety: since `ssh` still has an open - // handle to it, it still has access to the key. - drop(tempfile); - fs::remove_file(&path)?; - drop(tempdir); + // Try connecting to each host in turn. + let mut connect_err = None; + for host in &config.host { + // Bastion hosts (and therefore keys) tend to change, so we don't want + // to lock ourselves into trusting only the first we see. In any case, + // recording a known host would only last as long as the life of a + // storage pod, so it doesn't offer any protection. + match openssh::SessionBuilder::default() + .known_hosts_check(openssh::KnownHosts::Accept) + .user_known_hosts_file("/dev/null") + .user(config.user.clone()) + .port(config.port) + .keyfile(&path) + .server_alive_interval(timeout_config.keepalives_idle) + .connect_timeout(timeout_config.connect_timeout) + .connect_mux(host.clone()) + .await + { + Ok(session) => { + // Delete the private key for safety: since `ssh` still has an open + // handle to it, it still has access to the key. + drop(tempfile); + fs::remove_file(&path)?; + drop(tempdir); - // Ensure session is healthy. - session.check().await?; + // Ensure session is healthy. + session.check().await?; - Ok(session) + return Ok(session); + } + Err(err) => { + connect_err = Some(err); + } + } + } + Err(connect_err + .map(Into::into) + .unwrap_or_else(|| anyhow::anyhow!("no hosts to connect to"))) } async fn port_forward(session: &Session, host: &str, port: u16) -> Result { diff --git a/src/ssh-util/src/tunnel_manager.rs b/src/ssh-util/src/tunnel_manager.rs index 547d4fae1c16b..74a73abc24565 100644 --- a/src/ssh-util/src/tunnel_manager.rs +++ b/src/ssh-util/src/tunnel_manager.rs @@ -109,8 +109,8 @@ impl SshTunnelManager { if let SshTunnelStatus::Errored(e) = handle.check_status() { error!( "not using existing ssh tunnel \ - ({}:{} via {}@{}:{}) because it's broken: {e}", - remote_host, remote_port, config.user, config.host, config.port, + ({}:{} via {}) because it's broken: {e}", + remote_host, remote_port, config ); // This is bit unfortunate, as this method returns an @@ -123,8 +123,8 @@ impl SshTunnelManager { } info!( - "reusing existing ssh tunnel ({}:{} via {}@{}:{})", - remote_host, remote_port, config.user, config.host, config.port, + "reusing existing ssh tunnel ({}:{} via {})", + remote_host, remote_port, config ); return Ok(handle); } @@ -146,8 +146,8 @@ impl SshTunnelManager { // Try to connect. info!( - "initiating new ssh tunnel ({}:{} via {}@{}:{})", - remote_host, remote_port, config.user, config.host, config.port, + "initiating new ssh tunnel ({}:{} via {})", + remote_host, remote_port, config ); let config = config.clone(); @@ -181,6 +181,11 @@ impl SshTunnelManager { } /// Identifies a connection to a remote host via an SSH tunnel. +/// There are a couple of edge cases where this key format may result +/// in extra connections being created: +/// 1. If a host resolves to a different number of ips on different workers +/// 2. Different workers connect to different upstream resolved ips if they +/// appear connectable at different times. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] struct SshTunnelKey { config: SshTunnelConfig, diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 4a3e3bcf52b15..4b4f5745f42b6 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -10,7 +10,7 @@ //! Connection types. use std::borrow::Cow; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::sync::Arc; @@ -24,6 +24,7 @@ use mz_kafka_util::client::{ }; use mz_ore::error::ErrorExt; use mz_ore::future::{InTask, OreFutureExt}; +use mz_ore::netio::resolve_address; use mz_proto::tokio_postgres::any_ssl_mode; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::url::any_url; @@ -51,7 +52,7 @@ use url::Url; use crate::configuration::StorageConfiguration; use crate::connections::aws::{AwsConnection, AwsConnectionValidationError}; use crate::controller::AlterError; -use crate::dyncfgs::KAFKA_CLIENT_ID_ENRICHMENT_RULES; +use crate::dyncfgs::{ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES}; use crate::errors::{ContextCreationError, CsrConnectError}; use crate::AlterCompatible; @@ -661,6 +662,9 @@ impl KafkaConnection { config.set(*k, v); } + // TODO(roshan): Implement enforcement of external address validation once + // rdkafka client has been updated to support providing multiple resolved + // addresses for brokers let mut context = TunnelingClientContext::new( context, Handle::current(), @@ -690,8 +694,17 @@ impl KafkaConnection { .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion address we connect to is resolved to an external address. + let resolved = resolve_address( + &ssh_tunnel.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig { - host: ssh_tunnel.connection.host.clone(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair, @@ -735,11 +748,20 @@ impl KafkaConnection { ); } Tunnel::Ssh(ssh_tunnel) => { + // Ensure any SSH bastion address we connect to is resolved to an external address. + let ssh_host_resolved = resolve_address( + &ssh_tunnel.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; context .add_ssh_tunnel( addr, SshTunnelConfig { - host: ssh_tunnel.connection.host.clone(), + host: ssh_host_resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -1034,15 +1056,28 @@ impl CsrConnection { // incorrectly starts using this port. const DUMMY_PORT: u16 = 11111; + // TODO: use types to enforce that the URL has a string hostname. + let host = self + .url + .host_str() + .ok_or_else(|| anyhow!("url missing host"))?; match &self.tunnel { - Tunnel::Direct => {} + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_address( + host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + client_config = client_config.resolve_to_addrs( + host, + &resolved + .iter() + .map(|addr| SocketAddr::new(*addr, DUMMY_PORT)) + .collect::>(), + ) + } Tunnel::Ssh(ssh_tunnel) => { - // TODO: use types to enforce that the URL has a string hostname. - let host = self - .url - .host_str() - .ok_or_else(|| anyhow!("url missing host"))?; - let ssh_tunnel = ssh_tunnel .connect( storage_configuration, @@ -1093,11 +1128,6 @@ impl CsrConnection { Tunnel::AwsPrivatelink(connection) => { assert!(connection.port.is_none()); - // TODO: use types to enforce that the URL has a string hostname. - let host = self - .url - .host_str() - .ok_or_else(|| anyhow!("url missing host"))?; let privatelink_host = mz_cloud_resources::vpc_endpoint_host( connection.connection_id, connection.availability_zone.as_deref(), @@ -1332,7 +1362,17 @@ impl PostgresConnection { } let tunnel = match &self.tunnel { - Tunnel::Direct => mz_postgres_util::TunnelConfig::Direct, + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + mz_postgres_util::TunnelConfig::Direct { + resolved_ips: Some(resolved), + } + } Tunnel::Ssh(SshTunnel { connection_id, connection, @@ -1341,9 +1381,18 @@ impl PostgresConnection { .read_in_task_if(in_task, *connection_id) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_address( + &connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; mz_postgres_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: connection.host.clone(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1696,7 +1745,17 @@ impl MySqlConnection { opts = opts.ssl_opts(ssl_opts); let tunnel = match &self.tunnel { - Tunnel::Direct => mz_mysql_util::TunnelConfig::Direct, + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + mz_mysql_util::TunnelConfig::Direct { + resolved_ips: Some(resolved), + } + } Tunnel::Ssh(SshTunnel { connection_id, connection, @@ -1705,9 +1764,18 @@ impl MySqlConnection { .read_in_task_if(in_task, *connection_id) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_address( + &connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; mz_mysql_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: connection.host.clone(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1987,12 +2055,21 @@ impl SshTunnel { remote_port: u16, in_task: InTask, ) -> Result { + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_address( + &self.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; storage_configuration .connection_context .ssh_tunnel_manager .connect( SshTunnelConfig { - host: self.connection.host.clone(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: self.connection.port, user: self.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -2060,8 +2137,19 @@ impl SshConnection { ) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + let config = SshTunnelConfig { - host: self.host.clone(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: self.port, user: self.user.clone(), key_pair, diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index 0ff0e70bbf869..e19a5cd914328 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -48,6 +48,15 @@ pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config serde_json::Value> = "Rules for enriching the `client.id` property of Kafka clients with additional data.", ); +/// Whether or not to enforce that external connection addresses are global +/// (not private or local) when resolving them. +pub const ENFORCE_EXTERNAL_ADDRESSES: Config = Config::new( + "storage_enforce_external_addresses", + false, + "Whether or not to enforce that external connection addresses are global \ + (not private or local) when resolving them", +); + /// Whether or not to prevent buffering the entire _upstream_ snapshot in /// memory when processing it in memory. This is generally understood to reduce /// memory consumption. @@ -78,12 +87,13 @@ pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config> = C "Limit snapshot buffering in upsert.", ); -/// Adds the full set of all compute `Config`s. +/// Adds the full set of all storage `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs .add(&DELAY_SOURCES_PAST_REHYDRATION) .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION) .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES) + .add(&ENFORCE_EXTERNAL_ADDRESSES) .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING) .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING) } diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index eb8033d2d3b18..d21932a623258 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1003,6 +1003,8 @@ pub enum ContextCreationError { KafkaError(#[from] KafkaError), #[error(transparent)] Other(#[from] anyhow::Error), + #[error(transparent)] + Io(#[from] std::io::Error), } /// An extension trait for `Result` that makes producing `ContextCreationError`s easier. @@ -1044,6 +1046,9 @@ where ContextCreationError::KafkaError(e) => { ContextCreationError::Other(anyhow!(anyhow!(e).context(msg))) } + ContextCreationError::Io(e) => { + ContextCreationError::Other(anyhow!(anyhow!(e).context(msg))) + } } }) } @@ -1072,6 +1077,8 @@ pub enum CsrConnectError { #[error(transparent)] Openssl(#[from] openssl::error::ErrorStack), #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 18445f6e60382..f5b6e2dce21b1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -68,7 +68,7 @@ log = { version = "0.4.17", default-features = false, features = ["std"] } memchr = { version = "2.5.0", features = ["use_std"] } mime_guess = { version = "2.0.3" } mio = { version = "0.8.11", features = ["net", "os-ext"] } -mysql_async = { version = "0.34.1", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } mysql_common = { version = "0.32.1", default-features = false, features = ["binlog", "chrono"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } @@ -188,7 +188,7 @@ log = { version = "0.4.17", default-features = false, features = ["std"] } memchr = { version = "2.5.0", features = ["use_std"] } mime_guess = { version = "2.0.3" } mio = { version = "0.8.11", features = ["net", "os-ext"] } -mysql_async = { version = "0.34.1", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } mysql_common = { version = "0.32.1", default-features = false, features = ["binlog", "chrono"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td index 886471c34d08c..e0fd6c7d49f85 100644 --- a/test/kafka-auth/test-kafka-ssl.td +++ b/test/kafka-auth/test-kafka-ssl.td @@ -121,7 +121,7 @@ running # ALTER CONNECTION for Kafka + SSH ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true); -contains:Could not resolve hostname abcd +contains:failed to lookup address information: Name or service not known ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST); contains:HOST option is required diff --git a/test/mysql-cdc/mysql-cdc.td b/test/mysql-cdc/mysql-cdc.td index 8df080f555f06..8aff860b55ed5 100644 --- a/test/mysql-cdc/mysql-cdc.td +++ b/test/mysql-cdc/mysql-cdc.td @@ -222,6 +222,19 @@ contains:Connection refused ) contains:Access denied for user 'root' +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +! CREATE CONNECTION private_address TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false + ! CREATE SOURCE "mz_source" IN CLUSTER cdc_cluster FROM MYSQL CONNECTION mysql_conn diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index c08726d68559e..d15a18e3b0d65 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -190,7 +190,7 @@ ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 0 USER postgres, PASSWORD SECRET pgpass ) -contains:error connecting to server: failed to lookup address information: Name or service not known: failed to lookup address +contains:failed to lookup address information: Name or service not known ! CREATE CONNECTION no_such_port TO POSTGRES ( HOST postgres, @@ -226,6 +226,20 @@ contains:password authentication failed for user "postgres" ) contains:database "no_such_dbname" does not exist +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +! CREATE CONNECTION private_address TO POSTGRES ( + HOST postgres, + DATABASE postgres, + USER postgres, + PASSWORD SECRET pgpass + ) +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false + ! CREATE SOURCE "no_such_publication" IN CLUSTER cdc_cluster FROM POSTGRES CONNECTION pg (PUBLICATION 'no_such_publication'); diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index bff689245df98..5fb962c7adfc2 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -182,8 +182,13 @@ def populate(self, c: Composition) -> None: $ kafka-ingest topic=source-topic format=bytes ABC + # Specify a faster metadata refresh interval so errors are detected every second + # instead of every minute > CREATE SOURCE source1 - FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source-topic-${testdrive.seed}') + FROM KAFKA CONNECTION kafka_conn ( + TOPIC 'testdrive-source-topic-${testdrive.seed}', + TOPIC METADATA REFRESH INTERVAL '1s' + ) FORMAT BYTES ENVELOPE NONE # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/materialize/issues/16582 @@ -466,7 +471,7 @@ def assert_recovery(self, c: Composition) -> None: "redpanda", "rpk", "topic", "delete", f"testdrive-source-topic-{seed}" ), expected_error="UnknownTopicOrPartition|topic", - fixage=None + fixage=None, # Re-creating the topic does not restart the source # fixage=lambda c,seed: redpanda_topics(c, "create", seed), ), @@ -499,7 +504,7 @@ def assert_recovery(self, c: Composition) -> None: PgDisruption( name="kill-postgres", breakage=lambda c, _: c.kill("postgres"), - expected_error="error connecting to server|connection closed|deadline has elapsed", + expected_error="error connecting to server|connection closed|deadline has elapsed|failed to lookup address information", fixage=lambda c, _: c.up("postgres"), ), PgDisruption( diff --git a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td index fb8f327289bb6..8a1aa0b5c6223 100644 --- a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td +++ b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td @@ -14,7 +14,6 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE error LIKE 'ssh:%' AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr') stalled stalled diff --git a/test/ssh-connection/pg-source-after-ssh-failure.td b/test/ssh-connection/pg-source-after-ssh-failure.td index 67442f72e93d8..1a9da9cd86538 100644 --- a/test/ssh-connection/pg-source-after-ssh-failure.td +++ b/test/ssh-connection/pg-source-after-ssh-failure.td @@ -11,5 +11,5 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE s.name = 'mz_source' AND error LIKE 'ssh:%' + WHERE s.name = 'mz_source' stalled diff --git a/test/ssh-connection/ssh-connections.td b/test/ssh-connection/ssh-connections.td index 7aad8e0fde7c6..c1b3ecb169aaf 100644 --- a/test/ssh-connection/ssh-connections.td +++ b/test/ssh-connection/ssh-connections.td @@ -120,3 +120,28 @@ contains: still depended upon > SELECT name, type FROM mz_connections; name type ---------------- + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +> CREATE CONNECTION omega TO SSH TUNNEL ( + HOST 'chaos.example.com', + USER 'omega', + PORT 22 + ); + +# error is not consistent +! VALIDATE CONNECTION omega; +contains:failed to + +> CREATE CONNECTION local TO SSH TUNNEL ( + HOST 'ssh-bastion-host', + USER 'omega', + PORT 22 + ); + +! VALIDATE CONNECTION local; +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td new file mode 100644 index 0000000000000..7b1cebfbe61d3 --- /dev/null +++ b/test/testdrive/connection-create-external.td @@ -0,0 +1,74 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-arg-default default-storage-size=1 +$ set-arg-default single-replica-cluster=quickstart + +# +# Test enforcement that connections are made to global non-private IPs + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_connection_validation_syntax = true +ALTER SYSTEM SET storage_enforce_external_addresses = true + +# Setup kafka topic with schema +# must be a subset of the keys in the rows +$ set keyschema={ + "type": "record", + "name": "Key", + "fields": [ + {"name": "id", "type": "long"} + ] + } + +$ set schema={ + "type" : "record", + "name" : "envelope", + "fields" : [ + { + "name": "before", + "type": [ + { + "name": "row", + "type": "record", + "fields": [ + { + "name": "id", + "type": "long" + }, + { + "name": "creature", + "type": "string" + }] + }, + "null" + ] + }, + { + "name": "after", + "type": ["row", "null"] + } + ] + } + +$ kafka-create-topic topic=csr_test partitions=1 + +$ kafka-ingest format=avro topic=csr_test key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1 +{"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}} +{"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}} +{"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}} + + +! CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY ( + URL '${testdrive.schema-registry-url}' + ) WITH (VALIDATE = true); +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false diff --git a/test/testdrive/connection-validation.td b/test/testdrive/connection-validation.td index 36db1e9876dbc..7692368ac1116 100644 --- a/test/testdrive/connection-validation.td +++ b/test/testdrive/connection-validation.td @@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true > CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22) ! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) -contains:failed to connect to the remote host +contains:failed to lookup address information # Create the connection without validation and validate later > CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false) ! VALIDATE CONNECTION invalid_kafka_conn -contains:failed to connect to the remote host +contains:failed to lookup address information