diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 0648ea52ace6b..8b1a5d1966b82 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -104,7 +104,7 @@ use mz_catalog::memory::objects::{ }; use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent}; use mz_compute_client::as_of_selection; -use mz_compute_client::controller::error::InstanceMissing; +use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing}; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::Plan; @@ -3682,23 +3682,41 @@ impl Coordinator { /// Call into the compute controller to install a finalized dataflow, and /// initialize the read policies for its exported readable objects. + /// + /// # Panics + /// + /// Panics if dataflow creation fails. pub(crate) async fn ship_dataflow( &mut self, dataflow: DataflowDescription, instance: ComputeInstanceId, subscribe_target_replica: Option, ) { + self.try_ship_dataflow(dataflow, instance, subscribe_target_replica) + .await + .unwrap_or_terminate("dataflow creation cannot fail"); + } + + /// Call into the compute controller to install a finalized dataflow, and + /// initialize the read policies for its exported readable objects. + pub(crate) async fn try_ship_dataflow( + &mut self, + dataflow: DataflowDescription, + instance: ComputeInstanceId, + subscribe_target_replica: Option, + ) -> Result<(), DataflowCreationError> { // We must only install read policies for indexes, not for sinks. // Sinks are write-only compute collections that don't have read policies. let export_ids = dataflow.exported_index_ids().collect(); self.controller .compute - .create_dataflow(instance, dataflow, subscribe_target_replica) - .unwrap_or_terminate("dataflow creation cannot fail"); + .create_dataflow(instance, dataflow, subscribe_target_replica)?; self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default) .await; + + Ok(()) } /// Call into the compute controller to allow writes to the specified IDs diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index fa5b1291dab1e..61d254c8738f7 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -773,7 +773,9 @@ impl crate::coord::Coordinator { self.controller .compute .create_dataflow(compute_instance, dataflow, None) - .unwrap_or_terminate("cannot fail to create dataflows"); + .map_err( + AdapterError::concurrent_dependency_drop_from_dataflow_creation_error, + )?; self.initialize_compute_read_policies( output_ids, compute_instance, @@ -1332,8 +1334,16 @@ impl crate::coord::Coordinator { .await, ); - self.ship_dataflow(df_desc, compute_instance, target_replica) - .await; + // Try to ship the dataflow. We handle errors gracefully because dependencies might have + // disappeared during sequencing. + if let Err(e) = self + .try_ship_dataflow(df_desc, compute_instance, target_replica) + .await + .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error) + { + let _ = tx.send(Err(e)); + return; + } // Spawn background task to wait for completion // We must NOT await sink_rx here directly, as that would block the coordinator's main task diff --git a/src/adapter/src/error.rs b/src/adapter/src/error.rs index b21707e5f4748..afc4bb7e5eb41 100644 --- a/src/adapter/src/error.rs +++ b/src/adapter/src/error.rs @@ -675,6 +675,29 @@ impl AdapterError { e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e), } } + + pub fn concurrent_dependency_drop_from_dataflow_creation_error( + e: compute_error::DataflowCreationError, + ) -> Self { + use compute_error::DataflowCreationError::*; + match e { + InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "cluster", + dependency_id: id.to_string(), + }, + CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "collection", + dependency_id: id.to_string(), + }, + ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "replica", + dependency_id: id.to_string(), + }, + MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => { + AdapterError::internal("dataflow creation error", e) + } + } + } } impl fmt::Display for AdapterError {