Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_prepared_statement_history -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text_redacted -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_replacement_materialized_views -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_session_history -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_all_objects -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_clusters -->
Expand All @@ -1349,6 +1350,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_indexes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_materialized_views -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_network_policies -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_replacements -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_roles -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_schemas -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_secrets -->
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def get_minimal_system_parameters(
"enable_refresh_every_mvs": "true",
"enable_repr_typecheck": "true",
"enable_cluster_schedule_refresh": "true",
"enable_replacement_materialized_views": "true",
"enable_sql_server_source": "true",
"enable_statement_lifecycle_logging": "true",
"enable_compute_temporal_bucketing": "true",
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ impl Catalog {
| CatalogItemType::Func
| CatalogItemType::Secret
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => {
| CatalogItemType::ContinualTask
| CatalogItemType::ReplacementMaterializedView => {
dependencies.extend(global_ids);
}
CatalogItemType::View => {
Expand Down Expand Up @@ -1553,6 +1554,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
CommentObjectId::ReplacementMaterializedView(_) => ObjectType::ReplacementMaterializedView,
}
}

Expand Down Expand Up @@ -1584,6 +1586,9 @@ pub(crate) fn system_object_type_to_audit_object_type(
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
mz_sql::catalog::ObjectType::ReplacementMaterializedView => {
ObjectType::ReplacementMaterializedView
}
},
SystemObjectType::System => ObjectType::System,
}
Expand Down
9 changes: 6 additions & 3 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
| CatalogItemType::Type
| CatalogItemType::Func
| CatalogItemType::Secret
| CatalogItemType::Connection => push_update(
| CatalogItemType::Connection
| CatalogItemType::ReplacementMaterializedView => push_update(
StateUpdate {
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
ts,
Expand Down Expand Up @@ -2046,7 +2047,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index => derived_items.push(update),
| CatalogItemType::Index
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
Expand Down Expand Up @@ -2116,7 +2118,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index => derived_items.push(update),
| CatalogItemType::Index
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
Expand Down
98 changes: 89 additions & 9 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ use mz_catalog::builtin::{
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS,
MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
MZ_WEBHOOKS_SOURCES,
MZ_PSEUDO_TYPES, MZ_REPLACEMENT_MATERIALIZED_VIEWS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS,
MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
Index, MaterializedView, ReplacementMaterializedView, Sink, Table, TableDataSource, Type, View,
};
use mz_controller::clusters::{
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
Expand All @@ -55,7 +55,7 @@ use mz_repr::adt::jsonb::Jsonb;
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
use mz_repr::adt::regex;
use mz_repr::network_policy_id::NetworkPolicyId;
use mz_repr::refresh_schedule::RefreshEvery;
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
use mz_repr::role_id::RoleId;
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
Expand Down Expand Up @@ -818,6 +818,10 @@ impl CatalogState {
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
id, oid, schema_id, name, owner_id, privileges, ct, diff,
),
CatalogItem::ReplacementMaterializedView(mview) => self
.pack_replacement_materialized_view_update(
id, oid, schema_id, name, owner_id, privileges, mview, diff,
),
};

if !entry.item().is_temporary() {
Expand Down Expand Up @@ -1480,7 +1484,23 @@ impl CatalogState {
diff,
));

if let Some(refresh_schedule) = &mview.refresh_schedule {
Self::pack_refresh_strategy_update(
&id,
mview.refresh_schedule.as_ref(),
diff,
&mut updates,
);

updates
}

fn pack_refresh_strategy_update(
id: &CatalogItemId,
refresh_schedule: Option<&RefreshSchedule>,
diff: Diff,
updates: &mut Vec<BuiltinTableUpdate<&BuiltinTable>>,
) {
if let Some(refresh_schedule) = refresh_schedule {
// This can't be `ON COMMIT`, because that is represented by a `None` instead of an
// empty `RefreshSchedule`.
assert!(!refresh_schedule.is_empty());
Expand Down Expand Up @@ -1537,6 +1557,65 @@ impl CatalogState {
diff,
));
}
}

fn pack_replacement_materialized_view_update(
&self,
id: CatalogItemId,
oid: u32,
schema_id: &SchemaSpecifier,
name: &str,
owner_id: &RoleId,
privileges: Datum,
mview: &ReplacementMaterializedView,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
mview.create_sql, e
)
})
.into_element()
.ast;
let query_string = match &create_stmt {
Statement::CreateReplacementMaterializedView(stmt) => {
let mut query_string = stmt.query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');
query_string
}
_ => unreachable!(),
};

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate::row(
&*MZ_REPLACEMENT_MATERIALIZED_VIEWS,
Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(&mview.replaces.to_string()),
Datum::String(&schema_id.to_string()),
Datum::String(name),
Datum::String(&mview.cluster_id.to_string()),
Datum::String(&query_string),
Datum::String(&owner_id.to_string()),
privileges,
Datum::String(&mview.create_sql),
Datum::String(&create_stmt.to_ast_string_redacted()),
]),
diff,
));

Self::pack_refresh_strategy_update(
&id,
mview.refresh_schedule.as_ref(),
diff,
&mut updates,
);

updates
}
Expand Down Expand Up @@ -2284,7 +2363,8 @@ impl CatalogState {
| CommentObjectId::Connection(global_id)
| CommentObjectId::Secret(global_id)
| CommentObjectId::Type(global_id)
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
| CommentObjectId::ContinualTask(global_id)
| CommentObjectId::ReplacementMaterializedView(global_id) => global_id.to_string(),
CommentObjectId::Role(role_id) => role_id.to_string(),
CommentObjectId::Database(database_id) => database_id.to_string(),
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ impl CatalogState {
| CommentObjectId::Connection(item_id)
| CommentObjectId::Type(item_id)
| CommentObjectId::Secret(item_id)
| CommentObjectId::ContinualTask(item_id) => {
| CommentObjectId::ContinualTask(item_id)
| CommentObjectId::ReplacementMaterializedView(item_id) => {
let entry = self.entry_by_id.get(&item_id);
match entry {
None => comment_inconsistencies
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,8 @@ fn add_new_remove_old_builtin_items_migration(
| CatalogItemType::Func
| CatalogItemType::Secret
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => continue,
| CatalogItemType::ContinualTask
| CatalogItemType::ReplacementMaterializedView => continue,
};
deleted_comments.insert(comment_id);
}
Expand Down
Loading