Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,17 @@ pub async fn analyze_flow(
.with(&flow_inst)?
.with(&flow_schema.schema)?
.into_fingerprint();

// Build a lineage fingerprint that focuses on export-level field lineage.
// It include export op identity, target kind, primary key schema and the
// output value fingerprinter (which is derived from the output value schema)
// so that benign changes (like exclude_pattern on sources or intermediate
// field ordering) that don't affect exported field lineage will not change
// this fingerprint.
let mut lineage_fp = Fingerprinter::default();
lineage_fp = lineage_fp.with(&flow_inst.export_ops)?;
lineage_fp = lineage_fp.with(&flow_schema.schema)?;
let lineage_fingerprint = lineage_fp.into_fingerprint();
Comment on lines +1130 to +1133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will miss certain information, like the spec changes (e.g. chunk size is changed for SplitRecursively), or behavior version change.

It will also be affected unnecessarily by some information, e.g. a field name (e.g. users may rename products to files in this example), and also ordering of ops (users may reorder some functions which will affect fields ordering in schema, they may also reordering collectors and export ops).

let plan_fut = async move {
let (import_ops, op_scope, export_ops) = try_join3(
try_join_all(import_ops_futs),
Expand All @@ -1130,6 +1141,7 @@ pub async fn analyze_flow(

Ok(ExecutionPlan {
logic_fingerprint,
lineage_fingerprint,
import_ops,
op_scope,
export_ops,
Expand Down
3 changes: 3 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ pub struct AnalyzedOpScope {

pub struct ExecutionPlan {
pub logic_fingerprint: Fingerprint,
/// Coarser-grained fingerprint that captures field-level lineage relevant to outputs/exports.
/// Changes that don't affect lineage should not modify this fingerprint.
pub lineage_fingerprint: Fingerprint,

pub import_ops: Vec<AnalyzedImportOp>,
pub op_scope: AnalyzedOpScope,
Expand Down
32 changes: 18 additions & 14 deletions src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub struct SourceTrackingInfoForProcessing {
pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub process_lineage_fingerprint: Option<Vec<u8>>,
pub max_process_ordinal: Option<i64>,
pub process_ordinal: Option<i64>,
}
Expand All @@ -99,7 +100,7 @@ pub async fn read_source_tracking_info_for_processing(
pool: &PgPool,
) -> Result<Option<SourceTrackingInfoForProcessing>> {
let query_str = format!(
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
if db_setup.has_fast_fingerprint_column {
"processed_source_fp"
} else {
Expand All @@ -124,6 +125,7 @@ pub struct SourceTrackingInfoForPrecommit {
pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub process_lineage_fingerprint: Option<Vec<u8>>,
pub process_ordinal: Option<i64>,
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
}
Expand All @@ -135,7 +137,7 @@ pub async fn read_source_tracking_info_for_precommit(
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<SourceTrackingInfoForPrecommit>> {
let query_str = format!(
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
if db_setup.has_fast_fingerprint_column {
"processed_source_fp"
} else {
Expand Down Expand Up @@ -240,6 +242,7 @@ pub async fn commit_source_tracking_info(
processed_source_ordinal: Option<i64>,
processed_source_fp: Option<Vec<u8>>,
logic_fingerprint: &[u8],
lineage_fingerprint: &[u8],
process_ordinal: i64,
process_time_micros: i64,
target_keys: TrackedTargetKeyForSource,
Expand All @@ -252,25 +255,25 @@ pub async fn commit_source_tracking_info(
"INSERT INTO {} ( \
source_id, source_key, \
max_process_ordinal, staging_target_keys, \
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})",
processed_source_ordinal, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
VALUES ($1, $2, $7 + 1, $3, $4, $5, $6, $7, $8, $9{})",
db_setup.table_name,
if db_setup.has_fast_fingerprint_column {
", processed_source_fp"
} else {
""
},
if db_setup.has_fast_fingerprint_column {
", $9"
", $10"
} else {
""
},
),
WriteAction::Update => format!(
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2",
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_lineage_fingerprint = $6, process_ordinal = $7, process_time_micros = $8, target_keys = $9{} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name,
if db_setup.has_fast_fingerprint_column {
", processed_source_fp = $9"
", processed_source_fp = $10"
} else {
""
},
Expand All @@ -282,12 +285,12 @@ pub async fn commit_source_tracking_info(
.bind(sqlx::types::Json(staging_target_keys)) // $3
.bind(processed_source_ordinal) // $4
.bind(logic_fingerprint) // $5
.bind(process_ordinal) // $6
.bind(process_time_micros) // $7
.bind(sqlx::types::Json(target_keys)); // $8

.bind(lineage_fingerprint) // $6
.bind(process_ordinal) // $7
.bind(process_time_micros) // $8
.bind(sqlx::types::Json(target_keys)); // $9
if db_setup.has_fast_fingerprint_column {
query = query.bind(processed_source_fp); // $9
query = query.bind(processed_source_fp); // $10
}
query.execute(db_executor).await?;

Expand Down Expand Up @@ -317,7 +320,8 @@ pub struct TrackedSourceKeyMetadata {
pub source_key: serde_json::Value,
pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
// pub process_logic_fingerprint: Option<Vec<u8>>,
pub process_lineage_fingerprint: Option<Vec<u8>>,
pub max_process_ordinal: Option<i64>,
pub process_ordinal: Option<i64>,
}
Expand All @@ -341,7 +345,7 @@ impl ListTrackedSourceKeyMetadataState {
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
self.query_str = format!(
"SELECT \
source_key, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal \
source_key, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal \
FROM {} WHERE source_id = $1",
if db_setup.has_fast_fingerprint_column {
"processed_source_fp"
Expand Down
1 change: 1 addition & 0 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn upgrade_tracking_table(
processed_source_ordinal BIGINT,
{opt_fast_fingerprint_column}
process_logic_fingerprint BYTEA,
process_lineage_fingerprint BYTEA,
process_ordinal BIGINT,
process_time_micros BIGINT,
target_keys JSONB,
Expand Down
117 changes: 104 additions & 13 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ pub struct SourceVersion {
impl SourceVersion {
pub fn from_stored(
stored_ordinal: Option<i64>,
stored_fp: &Option<Vec<u8>>,
curr_fp: Fingerprint,
stored_lineage_fp: &Option<Vec<u8>>,
curr_lineage_fp: Fingerprint,
) -> Self {
Self {
ordinal: Ordinal(stored_ordinal),
kind: match &stored_fp {
kind: match &stored_lineage_fp {
Some(stored_fp) => {
if stored_fp.as_slice() == curr_fp.0.as_slice() {
if stored_fp.as_slice() == curr_lineage_fp.0.as_slice() {
SourceVersionKind::CurrentLogic
} else {
SourceVersionKind::DifferentLogic
Expand All @@ -74,23 +74,23 @@ impl SourceVersion {

pub fn from_stored_processing_info(
info: &db_tracking::SourceTrackingInfoForProcessing,
curr_fp: Fingerprint,
curr_lineage_fp: Fingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
&info.process_logic_fingerprint,
curr_fp,
&info.process_lineage_fingerprint,
curr_lineage_fp,
)
}

pub fn from_stored_precommit_info(
info: &db_tracking::SourceTrackingInfoForPrecommit,
curr_fp: Fingerprint,
curr_lineage_fp: Fingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
&info.process_logic_fingerprint,
curr_fp,
&info.process_lineage_fingerprint,
curr_lineage_fp,
)
}

Expand Down Expand Up @@ -119,9 +119,16 @@ impl SourceVersion {
// Never process older ordinals to maintain consistency
let should_skip = match (self.ordinal.0, target.ordinal.0) {
(Some(existing_ordinal), Some(target_ordinal)) => {
// Skip if target ordinal is older, or same ordinal with same/older logic version
existing_ordinal > target_ordinal
|| (existing_ordinal == target_ordinal && self.kind >= target.kind)
// Skip if target ordinal is older
if existing_ordinal > target_ordinal {
true
// If ordinals are equal, only skip if logic is unchanged (CurrentLogic)
} else if existing_ordinal == target_ordinal {
self.kind == SourceVersionKind::CurrentLogic
&& target.kind == SourceVersionKind::CurrentLogic
} else {
false
}
}
_ => false,
};
Expand Down Expand Up @@ -835,6 +842,7 @@ impl<'a> RowIndexer<'a> {
source_version.ordinal.into(),
source_fp,
&self.src_eval_ctx.plan.logic_fingerprint.0,
&self.src_eval_ctx.plan.lineage_fingerprint.0,
precommit_metadata.process_ordinal,
self.process_time.timestamp_micros(),
precommit_metadata.new_target_keys,
Expand Down Expand Up @@ -1076,4 +1084,87 @@ mod tests {
"After optimization, same ordinal should be skipped"
);
}

#[test]
fn test_lineage_fingerprint_skip_logic_happy_path() {
// Happy path: lineage fingerprint matches, should skip if ordinal is same or older
let stored_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
let target_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
assert!(stored_version.should_skip(&target_version, None));
// Newer ordinal should not skip
let newer_version = SourceVersion {
ordinal: Ordinal(Some(101)),
kind: SourceVersionKind::CurrentLogic,
};
assert!(!stored_version.should_skip(&newer_version, None));
}

#[test]
fn test_lineage_fingerprint_skip_logic_benign_change() {
// Benign change: lineage fingerprint unchanged, ordinal increases
let stored_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
let benign_version = SourceVersion {
ordinal: Ordinal(Some(101)),
kind: SourceVersionKind::CurrentLogic,
};
assert!(!stored_version.should_skip(&benign_version, None));
}

#[test]
fn test_lineage_fingerprint_skip_logic_breaking_change() {
// Breaking change: lineage fingerprint changes, should not skip even if ordinal is same
let stored_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
let breaking_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::DifferentLogic,
};
assert!(!stored_version.should_skip(&breaking_version, None));
}

#[test]
fn test_lineage_fingerprint_skip_logic_edge_cases() {
// Edge case: missing ordinals
let stored_version = SourceVersion {
ordinal: Ordinal(None),
kind: SourceVersionKind::CurrentLogic,
};
let target_version = SourceVersion {
ordinal: Ordinal(None),
kind: SourceVersionKind::CurrentLogic,
};
// Should not skip if ordinals are missing
assert!(!stored_version.should_skip(&target_version, None));
// Edge case: stored ordinal is older
let stored_version = SourceVersion {
ordinal: Ordinal(Some(99)),
kind: SourceVersionKind::CurrentLogic,
};
let target_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
assert!(!stored_version.should_skip(&target_version, None));
// Edge case: stored ordinal is newer
let stored_version = SourceVersion {
ordinal: Ordinal(Some(101)),
kind: SourceVersionKind::CurrentLogic,
};
let target_version = SourceVersion {
ordinal: Ordinal(Some(100)),
kind: SourceVersionKind::CurrentLogic,
};
assert!(stored_version.should_skip(&target_version, None));
}
}
4 changes: 2 additions & 2 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ impl SourceIndexingContext {
version_state: SourceRowVersionState {
source_version: SourceVersion::from_stored(
key_metadata.processed_source_ordinal,
&key_metadata.process_logic_fingerprint,
plan.logic_fingerprint,
&key_metadata.process_lineage_fingerprint,
plan.lineage_fingerprint,
),
content_version_fp: key_metadata.processed_source_fp,
},
Expand Down