Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_auth::password::Password;
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_expr::RowSetFinishing;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -52,7 +53,7 @@ use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{AdapterNotice, AppendWebhookError, ReadHolds, optimize};
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};

#[derive(Debug)]
pub struct CatalogSnapshot {
Expand Down Expand Up @@ -213,7 +214,7 @@ pub enum Command {
},

ExecuteCopyTo {
global_lir_plan: Box<optimize::copy_to::GlobalLirPlan>,
df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl Coordinator {
}

Command::ExecuteCopyTo {
global_lir_plan,
df_desc,
compute_instance,
target_replica,
source_ids,
Expand All @@ -371,7 +371,7 @@ impl Coordinator {
// through tx when the COPY TO completes (or immediately if setup fails).
// We just call it and let it handle all response sending.
self.implement_copy_to(
*global_lir_plan,
*df_desc,
compute_instance,
target_replica,
source_ids,
Expand Down
10 changes: 4 additions & 6 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use uuid::Uuid;

use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::optimize::{self, OptimizerError};
use crate::optimize::OptimizerError;
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::util::ResultExt;
use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
Expand Down Expand Up @@ -1257,7 +1257,7 @@ impl crate::coord::Coordinator {
/// All errors (setup or execution) are sent through tx.
pub(crate) async fn implement_copy_to(
&mut self,
global_lir_plan: optimize::copy_to::GlobalLirPlan,
df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
Expand All @@ -1270,12 +1270,12 @@ impl crate::coord::Coordinator {
let _ = tx.send(Err(e));
};

let sink_id = global_lir_plan.sink_id();
let sink_id = df_desc.sink_id();

// # Inlined from peek_copy_to_preflight

let connection_context = self.connection_context().clone();
let sinks = &global_lir_plan.df_desc().sink_exports;
let sinks = &df_desc.sink_exports;

if sinks.len() != 1 {
send_err(
Expand Down Expand Up @@ -1315,8 +1315,6 @@ impl crate::coord::Coordinator {

// # Inlined from peek_copy_to_dataflow

let (df_desc, _df_meta) = global_lir_plan.unapply();

// Create and register ActiveCopyTo.
// Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
// This is different from the command's tx which sends the response to the client
Expand Down
30 changes: 19 additions & 11 deletions src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,16 @@ impl PeekClient {

let (peek_plan, df_meta, typ) = global_lir_plan.unapply();

// Warning: Do not bail out from the new peek sequencing after this point, because the
// following has side effects. TODO(peek-seq): remove this comment once we never
// bail out to the old sequencing.

coord::sequencer::emit_optimizer_notices(
&*catalog,
session,
&df_meta.optimizer_notices,
);

// Generate plan insights notice if needed
if let Some(trace) = plan_insights_optimizer_trace {
let target_cluster = catalog.get_cluster(target_cluster_id);
Expand All @@ -928,16 +938,6 @@ impl PeekClient {
session.add_notice(AdapterNotice::PlanInsights(insights));
}

// Warning: Do not bail out from the new peek sequencing after this point, because the
// following has side effects. TODO(peek-seq): remove this comment once we never
// bail out to the old sequencing.

coord::sequencer::emit_optimizer_notices(
&*catalog,
session,
&df_meta.optimizer_notices,
);

// TODO(peek-seq): move this up to the beginning of the function when we have eliminated all
// the fallbacks to the old peek sequencing. Currently, it has to be here to avoid
// double-counting a fallback situation, but this has the drawback that if we error out
Expand Down Expand Up @@ -1009,9 +1009,17 @@ impl PeekClient {
global_lir_plan,
source_ids,
} => {
let (df_desc, df_meta) = global_lir_plan.unapply();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might have to be _df_meta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the next commit: passed in to emit_optimizer_notices.


coord::sequencer::emit_optimizer_notices(
&*catalog,
session,
&df_meta.optimizer_notices,
);

let response = self
.call_coordinator(|tx| Command::ExecuteCopyTo {
global_lir_plan: Box::new(global_lir_plan),
df_desc: Box::new(df_desc),
compute_instance: target_cluster_id,
target_replica,
source_ids,
Expand Down
9 changes: 6 additions & 3 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,13 @@ impl GlobalLirPlan {
&self.df_desc
}

/// Returns the id of the dataflow's sink export.
///
/// # Panics
///
/// Panics if the dataflow has no sink exports or has more than one.
pub fn sink_id(&self) -> GlobalId {
let sink_exports = &self.df_desc.sink_exports;
let sink_id = sink_exports.keys().next().expect("valid sink");
*sink_id
self.df_desc.sink_id()
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/adapter/src/optimize/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,27 @@ pub struct GlobalLirPlan {
}

impl GlobalLirPlan {
/// Returns the id of the dataflow's sink export.
///
/// # Panics
///
/// Panics if the dataflow has no sink exports or has more than one.
pub fn sink_id(&self) -> GlobalId {
let sink_exports = &self.df_desc.sink_exports;
let sink_id = sink_exports.keys().next().expect("valid sink");
*sink_id
self.df_desc.sink_id()
}

pub fn as_of(&self) -> Option<Timestamp> {
self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
}

/// Returns the description of the dataflow's sink export.
///
/// # Panics
///
/// Panics if the dataflow has no sink exports or has more than one.
pub fn sink_desc(&self) -> &ComputeSinkDesc {
let sink_exports = &self.df_desc.sink_exports;
let sink_desc = sink_exports.values().next().expect("valid sink");
let sink_desc = sink_exports.values().into_element();
sink_desc
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/compute-types/src/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt;

use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_or_log;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{GlobalId, SqlRelationType};
Expand Down Expand Up @@ -396,6 +397,17 @@ impl<P, S, T> DataflowDescription<P, S, T> {
assert!(builds.next().is_none());
build
}

/// Returns the id of the dataflow's sink export.
///
/// # Panics
///
/// Panics if the dataflow has no sink exports or has more than one.
pub fn sink_id(&self) -> GlobalId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this also wants to panic when there are more than one sink? but I'm not familiar enough with this code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, yes. I've now made it enforce that we have exactly one sink.

let sink_exports = &self.sink_exports;
let sink_id = sink_exports.keys().into_element();
*sink_id
}
}

impl<P, S, T> DataflowDescription<P, S, T>
Expand Down