-
Notifications
You must be signed in to change notification settings - Fork 11
implement distributed EXPLAIN ANALYZE #182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7ffad58
to
0b95862
Compare
src/explain.rs
Outdated
.to_string()), | ||
Some(dist_exec) => { | ||
// If the plan was distributed, collect metrics from the coordinating stage exec. | ||
// TODO: Should we move this into the DistributedExec itself or a new ExplainAnalyzeExec? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking for thoughts here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good!
I think we are missing one very important piece of metrics collection: allowing users to manually traverse the plan and doing ExecutionPlan.metrics()
on all the nodes. Here is a practical use case:
Users might have their own custom ExecutionPlan
implementations, and there, they might be collecting their own user-defined metrics. If that's the case, it's very likely that they want to just programmatically traverse the plan looking for their own custom nodes and extract the raw collected metrics values in order to report them as fields in their logs or traces.
This means that unfortunately just being able to display a string with the plan enriched with metrics is not enough for bringing feature parity with what DataFusion offers, and my bet is that if we want to satisfy the "walk your plan and collect your metrics programmatically" case, the approach here will probably need to change.
2c2645c
to
1bb3711
Compare
src/execution_plans/distributed.rs
Outdated
} | ||
} | ||
|
||
pub(crate) fn with_display_ctx(&self, display_ctx: DisplayCtx) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this to smuggle the display ctx into display_plan_ascii
without adding a parameter... Maybe it's more clean to add an argument to display_plan_ascii
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's a bit weird that DisplayCtx
is a field in the DistributedExec
struct, when it's irrelevant for execution purposes.
If you ask me, I think threading a bool
like this:
pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String;
Is a more than good enough solution for now, and if we find ourselves needing to pass around more fields in the future, we can bring back the extra struct as means of transportation for display configuration.
1bb3711
to
9081232
Compare
@@ -48,12 +48,35 @@ mod tests { | |||
│ DataSourceExec: file_groups={6 groups: [[/testdata/tpch/data/lineitem/1.parquet:<int>..<int>, /testdata/tpch/data/lineitem/10.parquet:<int>..<int>, /testdata/tpch/data/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/data/lineitem/11.parquet:<int>..<int>, /testdata/tpch/data/lineitem/12.parquet:<int>..<int>, /testdata/tpch/data/lineitem/13.parquet:<int>..<int>, /testdata/tpch/data/lineitem/14.parquet:<int>..<int>], [/testdata/tpch/data/lineitem/14.parquet:<int>..<int>, /testdata/tpch/data/lineitem/15.parquet:<int>..<int>, /testdata/tpch/data/lineitem/16.parquet:<int>..<int>], [/testdata/tpch/data/lineitem/16.parquet:<int>..<int>, /testdata/tpch/data/lineitem/2.parquet:<int>..<int>, /testdata/tpch/data/lineitem/3.parquet:<int>..<int>, /testdata/tpch/data/lineitem/4.parquet:<int>..<int>], [/testdata/tpch/data/lineitem/4.parquet:<int>..<int>, /testdata/tpch/data/lineitem/5.parquet:<int>..<int>, /testdata/tpch/data/lineitem/6.parquet:<int>..<int>, /testdata/tpch/data/lineitem/7.parquet:<int>..<int>], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@6 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[] | |||
└────────────────────────────────────────────────── | |||
"); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-ran all these tests in single node mode and the results are available here: https://github.com/datafusion-contrib/datafusion-distributed/blob/js/full-explain-analyze-rebased-comparison/tests/tpch_validation_test.rs
I looked over the results and it looks like output_rows
values match between the distributed results here and the single node results (+ I asked claude to analyze everything), so it looks like the metrics should be correct 👍🏽
9081232
to
4a28097
Compare
@gabotechs This is ready for another review :) Now, you can walk the plan programatically and collect metrics. Plus we don't display each task separately anymore. I've left some comments and updated the PR desc to make it easier to review. Also happy to sync tmr on this too. |
.chunks(num_metrics_per_task_per_node) | ||
.into_iter() | ||
.enumerate() | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will likely refactor some of this in the future. I think storing things in flat arrays and using Arc<HashMap<StageKey, Vec<MetricsSetProto>>>
makes the code hard to reason about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀 Getting there! left another round of comments
src/metrics/task_metrics_rewriter.rs
Outdated
let plan_with_metrics = | ||
rewrite_local_plan_with_metrics(dist_exec.plan.clone(), task_metrics.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 But do we need to do this for the local plan also? I would expect the local plan to already have collected metrics successfully, as it happened locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to do this because we take the metrics from the prepared_plan
, which is executed and has metrics, and put them in the plan
, which does not have metrics.
We want plan
to have metrics because that's the real plan returned by children()
that would be encountered during a traversal by a user. Also, the display code render the plan
and not the prepared_plan
, mainly because it has decoded stages it seems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Leaving a +1 here, but let's make sure that all the remaining TODOs are either:
- Addressed in this PR.
- Tracked by GitHub issues that will get immediately worked on after this PR.
- Removed leaving a comment about why we are fine with a non-perfect state of things.
src/execution_plans/distributed.rs
Outdated
} | ||
} | ||
|
||
pub(crate) fn with_display_ctx(&self, display_ctx: DisplayCtx) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's a bit weird that DisplayCtx
is a field in the DistributedExec
struct, when it's irrelevant for execution purposes.
If you ask me, I think threading a bool
like this:
pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String;
Is a more than good enough solution for now, and if we find ourselves needing to pass around more fields in the future, we can bring back the extra struct as means of transportation for display configuration.
src/metrics/task_metrics_rewriter.rs
Outdated
// TODO: This transform is a bit inefficient because we traverse the plan nodes more than once. | ||
// For now, this is acceptable for clarity. | ||
let plan_with_metrics = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 more than once is exactly twice right? if that's the case, it does not sound too bad.
I'd try to either address this in this PR, or to remove the TODO comment replacing it with another that says that we are fine with this tradeoff.
src/metrics/task_metrics_rewriter.rs
Outdated
// Collect metrics from the DistributedExec. | ||
// TODO: Should we move this into the DistributedExec itself or a new ExplainAnalyzeExec? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean move this all rewrite_distributed_plan_with_metrics
there? if that's the case, I think it's fine as it is now.
I'll either address the TODO comment or remove it, but I would try to avoid leaving TODO comments that are not tracked by tickets that will get immediately solved after this PR.
dd506d0
to
09433ce
Compare
This change adds support for displaying a distributed EXPLAIN ANALYZE output with metrics. It updates the TPCH validation tests to assert the EXPLAIN ANALYZE output for each query. I collected the single node results [here](https://github.com/datafusion-contrib/datafusion-distributed/blob/js/full-explain-analyze-rebased-comparison/tests/tpch_validation_test.rs) - using the `output_rows` metric values, we can cross check that the distributed plan metrics are correct. Implemenation notes: - Adds `src/explain.rs` to stores the main entrypoint to rendering the output string - We now re-write the whole plan before rendering using `rewrite_distributed_plan_with_metrics()` which is a new public API that takes an executed `DistributedExec` and wraps each node in the appropriate metrics - A user can call this method on an executed plan and traverse it to collect metrics from particular nodes (This may be hard though because all nodes are wrapped in MetricsWrapperExec...) - Adds a `show_metrics: bool` param to `display_plan_ascii` - Significantly refactors TaskMetricsRewriter -> StageMetricsWriter. See comment. Informs: #123 Other follow up work: - #185 - #184 - #188 - #189 - #190
This change makes the explain output more concise by changing the stage formatting from ``` Tasks: t0:[p0,p1,p2,p3,p4,p5] t1:[p0,p1,p2,p3,p4,p5] t2:[p0,p1,p2,p3,p4,p5] t3:[p0,p1,p2,p3,p4,p5] ``` to ``` Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5] ``` Closes: #192
09433ce
to
e5059cf
Compare
This change adds support for displaying a distributed EXPLAIN ANALYZE output with metrics. It updates the TPCH
validation tests to assert the EXPLAIN ANALYZE output for each query. I collected the single node
results here - using the
output_rows
metric values, we can cross checkthat the distributed plan metrics are correct.
Implemenation notes:
src/explain.rs
to stores the main entrypoint to rendering the output stringrewrite_distributed_plan_with_metrics()
whichis a new public API that takes an executed
DistributedExec
and wraps each node in the appropriate metrics(This may be hard though because all nodes are wrapped in MetricsWrapperExec...)
Informs: #123
Other follow up work: