Skip to content

Commit e679ba8

Browse files
committed
Enforce exactly one sink export
1 parent 72327c0 commit e679ba8

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

src/adapter/src/optimize/copy_to.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ impl GlobalLirPlan {
149149
&self.df_desc
150150
}
151151

152+
/// Returns the id of the dataflow's sink export.
153+
///
154+
/// # Panics
155+
///
156+
/// Panics if the dataflow has no sink exports or has more than one.
152157
pub fn sink_id(&self) -> GlobalId {
153158
self.df_desc.sink_id()
154159
}

src/adapter/src/optimize/subscribe.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,27 @@ pub struct GlobalLirPlan {
152152
}
153153

154154
impl GlobalLirPlan {
155+
/// Returns the id of the dataflow's sink export.
156+
///
157+
/// # Panics
158+
///
159+
/// Panics if the dataflow has no sink exports or has more than one.
155160
pub fn sink_id(&self) -> GlobalId {
156-
let sink_exports = &self.df_desc.sink_exports;
157-
let sink_id = sink_exports.keys().next().expect("valid sink");
158-
*sink_id
161+
self.df_desc.sink_id()
159162
}
160163

161164
pub fn as_of(&self) -> Option<Timestamp> {
162165
self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
163166
}
164167

168+
/// Returns the description of the dataflow's sink export.
169+
///
170+
/// # Panics
171+
///
172+
/// Panics if the dataflow has no sink exports or has more than one.
165173
pub fn sink_desc(&self) -> &ComputeSinkDesc {
166174
let sink_exports = &self.df_desc.sink_exports;
167-
let sink_desc = sink_exports.values().next().expect("valid sink");
175+
let sink_desc = sink_exports.values().into_element();
168176
sink_desc
169177
}
170178
}

src/compute-types/src/dataflows.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::collections::{BTreeMap, BTreeSet};
1313
use std::fmt;
1414

1515
use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16+
use mz_ore::collections::CollectionExt;
1617
use mz_ore::soft_assert_or_log;
1718
use mz_repr::refresh_schedule::RefreshSchedule;
1819
use mz_repr::{GlobalId, SqlRelationType};
@@ -397,14 +398,14 @@ impl<P, S, T> DataflowDescription<P, S, T> {
397398
build
398399
}
399400

400-
/// Returns the id of the first sink export.
401+
/// Returns the id of the dataflow's sink export.
401402
///
402403
/// # Panics
403404
///
404-
/// Panics if the dataflow has no sink exports.
405+
/// Panics if the dataflow has no sink exports or has more than one.
405406
pub fn sink_id(&self) -> GlobalId {
406407
let sink_exports = &self.sink_exports;
407-
let sink_id = sink_exports.keys().next().expect("valid sink");
408+
let sink_id = sink_exports.keys().into_element();
408409
*sink_id
409410
}
410411
}

0 commit comments

Comments
 (0)