Skip to content

Commit 7a0719b

Browse files
committed
adapter: Frontend peek sequencing -- more graceful error handling
1 parent 12571f8 commit 7a0719b

File tree

3 files changed

+57
-6
lines changed

3 files changed

+57
-6
lines changed

src/adapter/src/coord.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ use mz_catalog::memory::objects::{
104104
};
105105
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106106
use mz_compute_client::as_of_selection;
107-
use mz_compute_client::controller::error::InstanceMissing;
107+
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
108108
use mz_compute_types::ComputeInstanceId;
109109
use mz_compute_types::dataflows::DataflowDescription;
110110
use mz_compute_types::plan::Plan;
@@ -3682,23 +3682,41 @@ impl Coordinator {
36823682

36833683
/// Call into the compute controller to install a finalized dataflow, and
36843684
/// initialize the read policies for its exported readable objects.
3685+
///
3686+
/// # Panics
3687+
///
3688+
/// Panics if dataflow creation fails.
36853689
pub(crate) async fn ship_dataflow(
36863690
&mut self,
36873691
dataflow: DataflowDescription<Plan>,
36883692
instance: ComputeInstanceId,
36893693
subscribe_target_replica: Option<ReplicaId>,
36903694
) {
3695+
self.ship_dataflow_fallible(dataflow, instance, subscribe_target_replica)
3696+
.await
3697+
.unwrap_or_terminate("dataflow creation cannot fail");
3698+
}
3699+
3700+
/// Call into the compute controller to install a finalized dataflow, and
3701+
/// initialize the read policies for its exported readable objects.
3702+
pub(crate) async fn ship_dataflow_fallible(
3703+
&mut self,
3704+
dataflow: DataflowDescription<Plan>,
3705+
instance: ComputeInstanceId,
3706+
subscribe_target_replica: Option<ReplicaId>,
3707+
) -> Result<(), DataflowCreationError> {
36913708
// We must only install read policies for indexes, not for sinks.
36923709
// Sinks are write-only compute collections that don't have read policies.
36933710
let export_ids = dataflow.exported_index_ids().collect();
36943711

36953712
self.controller
36963713
.compute
3697-
.create_dataflow(instance, dataflow, subscribe_target_replica)
3698-
.unwrap_or_terminate("dataflow creation cannot fail");
3714+
.create_dataflow(instance, dataflow, subscribe_target_replica)?;
36993715

37003716
self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
37013717
.await;
3718+
3719+
Ok(())
37023720
}
37033721

37043722
/// Call into the compute controller to allow writes to the specified IDs

src/adapter/src/coord/peek.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,9 @@ impl crate::coord::Coordinator {
773773
self.controller
774774
.compute
775775
.create_dataflow(compute_instance, dataflow, None)
776-
.unwrap_or_terminate("cannot fail to create dataflows");
776+
.map_err(
777+
AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
778+
)?;
777779
self.initialize_compute_read_policies(
778780
output_ids,
779781
compute_instance,
@@ -1332,8 +1334,16 @@ impl crate::coord::Coordinator {
13321334
.await,
13331335
);
13341336

1335-
self.ship_dataflow(df_desc, compute_instance, target_replica)
1336-
.await;
1337+
// Try to ship the dataflow. We handle errors gracefully because dependencies might have
1338+
// disappeared during sequencing.
1339+
if let Err(e) = self
1340+
.ship_dataflow_fallible(df_desc, compute_instance, target_replica)
1341+
.await
1342+
.map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1343+
{
1344+
let _ = tx.send(Err(e));
1345+
return;
1346+
}
13371347

13381348
// Spawn background task to wait for completion
13391349
// We must NOT await sink_rx here directly, as that would block the coordinator's main task

src/adapter/src/error.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,29 @@ impl AdapterError {
675675
e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e),
676676
}
677677
}
678+
679+
pub fn concurrent_dependency_drop_from_dataflow_creation_error(
680+
e: compute_error::DataflowCreationError,
681+
) -> Self {
682+
use compute_error::DataflowCreationError::*;
683+
match e {
684+
InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
685+
dependency_kind: "cluster",
686+
dependency_id: id.to_string(),
687+
},
688+
CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
689+
dependency_kind: "collection",
690+
dependency_id: id.to_string(),
691+
},
692+
ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
693+
dependency_kind: "replica",
694+
dependency_id: id.to_string(),
695+
},
696+
MissingAsOf | SinceViolation(..) | EmptyAsOfForSubscribe | EmptyAsOfForCopyTo => {
697+
AdapterError::internal("dataflow creation error", e)
698+
}
699+
}
700+
}
678701
}
679702

680703
impl fmt::Display for AdapterError {

0 commit comments

Comments
 (0)