Skip to content

Commit 9dfc8d4

Browse files
authored
ESQL: Handle right hand side of Inline Stats coming optimized with LocalRelation shortcut (#135011)
This PR covers a small range of edge cases where the logical optimization rules "compete" in optimizing aggregations to a simplified tree that is either a `LocalRelation` by itself or that have the source leaf node as `LocalRelation`. When the right-hand side becomes a non-concrete source, it is one of two situations now: - the `EsqlSession` ran the left-hand side query and the results are modeled as `LocalRelation` which replace the `StubRelation` of the right-hand side - the logical optimization flow optimized away the right-hand side (essentially transformed it into a local query, no ES data involved) The code in the PR covers the second scenario above. There are few rules which touch the idea of `inline stats` and which optimize away the right-hand side, but there are also others which are generic and have no different behavior for `inline stats`. * `PruneEmptyPlans` is one rule where I debated a lot if it should also deal with `StubRelation` as an "empty" plan or remain as is. I decided that the particularities of `inlinejoin` are not the concern of this rule, but those of the EsqlSession where the "coordination" of the `inlinejoin` is mostly happening. * `PruneColumns` is another rule where the pruning of the right-hand side is handled * lastly, `ReplaceStatsFilteredAggWithEval` deals with pruning `inline stats` and `stats` that have filters that can also be pruned
1 parent 8f8664a commit 9dfc8d4

File tree

8 files changed

+123
-19
lines changed

8 files changed

+123
-19
lines changed

docs/changelog/135011.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135011
2+
summary: Handle right hand side of Inline Stats coming optimized with `LocalRelation`
3+
shortcut
4+
area: ES|QL
5+
type: bug
6+
issues: []

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,9 +1808,9 @@ from employees
18081808
10003 |Parto |null |0 |null |M |1 |4
18091809
;
18101810

1811-
prunedInlinestatsFollowedByInlinestats_GroupByOneFieldOnSecondInlinestats-Ignore
1812-
// values doesn't end up as null
1811+
prunedInlinestatsFollowedByInlinestats_GroupByOneFieldOnSecondInlinestats
18131812
required_capability: inline_stats
1813+
required_capability: inline_stats_fix_pruning_null_filter
18141814
from employees
18151815
| eval my_length = length(concat(first_name, null))
18161816
| inline stats count = count(my_length) where false,
@@ -1988,9 +1988,9 @@ null |0 |[Gino, Heping]
19881988
null |[0, 0, 0, 0] |[Berni, Chirstian, Amabile, Berni, Bojan, Chirstian, Anneke, Chirstian]|[Head Human Resources, Reporting Analyst, Support Engineer, Tech Lead]|10004
19891989
;
19901990

1991-
prunedInlinestatsFollowedByinlinestats-Ignore
1992-
// values doesn't end up as null
1991+
prunedInlinestatsFollowedByInlinestats
19931992
required_capability: inline_stats
1993+
required_capability: inline_stats_fix_pruning_null_filter
19941994
from employees
19951995
| eval my_length = length(concat(first_name, null))
19961996
| inline stats count = count(my_length) where false,
@@ -3877,3 +3877,39 @@ city:keyword|c:long
38773877
Raleigh |1
38783878
;
38793879

3880+
inlinestatsOptimizedAsLocalRelation_before_EsqlSession1
3881+
required_capability: inline_stats_fix_optimized_as_local_relation
3882+
3883+
from employees
3884+
| keep first_name, emp_no
3885+
| eval my_length = length(concat(first_name, null))
3886+
| inline stats count = count(my_length) where my_length > 0
3887+
| sort first_name
3888+
| limit 5
3889+
;
3890+
3891+
first_name:s | emp_no:i | my_length:i | count:l
3892+
Alejandro |10059 |null |0
3893+
Amabile |10091 |null |0
3894+
Anneke |10006 |null |0
3895+
Anoosh |10062 |null |0
3896+
Arumugam |10094 |null |0
3897+
;
3898+
3899+
inlinestatsOptimizedAsLocalRelation_before_EsqlSession2
3900+
required_capability: inline_stats_fix_optimized_as_local_relation
3901+
3902+
from employees
3903+
| eval my_length = concat(first_name, null)
3904+
| inline stats count = count(my_length) where my_length is not null
3905+
| keep emp_no, count, my_length
3906+
| sort emp_no
3907+
| limit 3
3908+
;
3909+
3910+
emp_no:i | count:l | my_length:s
3911+
10001 |0 |null
3912+
10002 |0 |null
3913+
10003 |0 |null
3914+
;
3915+

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,9 +1543,15 @@ public enum Cap {
15431543
*/
15441544
TS_COMMAND_V0(),
15451545

1546-
FIX_ALIAS_ID_WHEN_DROP_ALL_AGGREGATES
1546+
FIX_ALIAS_ID_WHEN_DROP_ALL_AGGREGATES,
15471547

1548-
;
1548+
/**
1549+
* INLINE STATS fix incorrect prunning of null filtering
1550+
* https://github.com/elastic/elasticsearch/pull/135011
1551+
*/
1552+
INLINE_STATS_FIX_PRUNING_NULL_FILTER(INLINESTATS_V11.enabled),
1553+
1554+
INLINE_STATS_FIX_OPTIMIZED_AS_LOCAL_RELATION(INLINESTATS_V11.enabled);
15491555

15501556
private final boolean enabled;
15511557

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.core.type.DataType;
13+
import org.elasticsearch.xpack.esql.optimizer.rules.PruneInlineJoinOnEmptyRightSide;
1314
import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanFunctionEqualsElimination;
1415
import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanSimplification;
1516
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineBinaryComparisons;
@@ -205,7 +206,8 @@ protected static Batch<LogicalPlan> operators(boolean local) {
205206
new PushDownAndCombineOrderBy(),
206207
new PruneRedundantOrderBy(),
207208
new PruneRedundantSortClauses(),
208-
new PruneLeftJoinOnNullMatchingField()
209+
new PruneLeftJoinOnNullMatchingField(),
210+
new PruneInlineJoinOnEmptyRightSide()
209211
);
210212
}
211213

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules;
9+
10+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
11+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
12+
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
13+
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
14+
15+
public final class PruneInlineJoinOnEmptyRightSide extends OptimizerRules.OptimizerRule<InlineJoin> {
16+
17+
@Override
18+
protected LogicalPlan rule(InlineJoin plan) {
19+
return plan.right() instanceof LocalRelation lr ? InlineJoin.inlineData(plan, lr) : plan;
20+
}
21+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2525
import org.elasticsearch.xpack.esql.plan.logical.Project;
2626
import org.elasticsearch.xpack.esql.plan.logical.Sample;
27+
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2728
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
28-
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
2929
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
3030
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
3131
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -34,6 +34,8 @@
3434
import java.util.ArrayList;
3535
import java.util.List;
3636

37+
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan;
38+
3739
/**
3840
* Remove unused columns created in the plan, in fields inside eval or aggregations inside stats.
3941
*/
@@ -193,9 +195,9 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet
193195
return p;
194196
}
195197

196-
private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
198+
private static LogicalPlan emptyLocalRelation(UnaryPlan plan) {
197199
// create an empty local relation with no attributes
198-
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
200+
return skipPlan(plan);
199201
}
200202

201203
private static boolean isLocalEmptyRelation(LogicalPlan plan) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.Set;
3031

3132
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3233
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
@@ -142,15 +143,29 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
142143
* \_Limit[1000[INTEGER],false]
143144
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
144145
*/
145-
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
146+
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan, Set<LocalRelation> subPlansResults) {
146147
Holder<LogicalPlanTuple> subPlan = new Holder<>();
147148
// Collect the first inlinejoin (bottom up in the tree)
148149
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
149150
// extract the right side of the plan and replace its source
150-
if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
151-
var p = replaceStub(ij.left(), ij.right());
152-
p.setOptimized();
153-
subPlan.set(new LogicalPlanTuple(p, ij.right()));
151+
if (subPlan.get() == null) {
152+
if (ij.right().anyMatch(p -> p instanceof StubRelation)) {
153+
var p = replaceStub(ij.left(), ij.right());
154+
p.setOptimized();
155+
subPlan.set(new LogicalPlanTuple(p, ij.right()));
156+
} else if (ij.right() instanceof LocalRelation relation
157+
&& (subPlansResults.isEmpty() || subPlansResults.contains(relation) == false)
158+
|| ij.right() instanceof LocalRelation == false && ij.right().anyMatch(p -> p instanceof LocalRelation)) {
159+
// In case the plan was optimized further and the StubRelation was replaced with a LocalRelation
160+
// or the right hand side became a LocalRelation alltogether, there is no need to replace the source of the
161+
// right-hand side anymore.
162+
var p = ij.right();
163+
p.setOptimized();
164+
subPlan.set(new LogicalPlanTuple(p, ij.right()));
165+
// TODO: INLINE STATS this is essentially an optimization similar to the one in PruneInlineJoinOnEmptyRightSide
166+
// this further supports the idea of running the optimization step again after the substitutions (see EsqlSession
167+
// executeSubPlan() method where we could run the optimizer after the results are replaced in place).
168+
}
154169
}
155170
});
156171
return subPlan.get();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.xpack.esql.analysis.Verifier;
4747
import org.elasticsearch.xpack.esql.core.expression.Attribute;
4848
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
49+
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
4950
import org.elasticsearch.xpack.esql.core.tree.Source;
5051
import org.elasticsearch.xpack.esql.core.type.DataType;
5152
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
@@ -82,6 +83,7 @@
8283
import java.util.ArrayList;
8384
import java.util.Collection;
8485
import java.util.HashMap;
86+
import java.util.HashSet;
8587
import java.util.Iterator;
8688
import java.util.List;
8789
import java.util.Map;
@@ -252,7 +254,8 @@ private void executeSubPlans(
252254
EsqlQueryRequest request,
253255
ActionListener<Result> listener
254256
) {
255-
var subPlan = firstSubPlan(optimizedPlan);
257+
var subPlansResults = new HashSet<LocalRelation>();
258+
var subPlan = firstSubPlan(optimizedPlan, subPlansResults);
256259

257260
// TODO: merge into one method
258261
if (subPlan != null) {
@@ -264,6 +267,7 @@ private void executeSubPlans(
264267
executionInfo,
265268
runner,
266269
request,
270+
subPlansResults,
267271
// Ensure we don't have subplan flag stuck in there on failure
268272
ActionListener.runAfter(listener, executionInfo::finishSubPlans)
269273
);
@@ -281,6 +285,7 @@ private void executeSubPlan(
281285
EsqlExecutionInfo executionInfo,
282286
PlanRunner runner,
283287
EsqlQueryRequest request,
288+
Set<LocalRelation> subPlansResults,
284289
ActionListener<Result> listener
285290
) {
286291
LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan());
@@ -297,6 +302,7 @@ private void executeSubPlan(
297302
LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result);
298303
localRelationBlocks.set(resultWrapper.supplier().get());
299304
var releasingNext = ActionListener.runAfter(next, () -> releaseLocalRelationBlocks(localRelationBlocks));
305+
subPlansResults.add(resultWrapper);
300306

301307
// replace the original logical plan with the backing result
302308
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
@@ -307,9 +313,10 @@ private void executeSubPlan(
307313
);
308314
// TODO: INLINE STATS can we do better here and further optimize the plan AFTER one of the subplans executed?
309315
newLogicalPlan.setOptimized();
310-
LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
316+
LOGGER.trace("Main plan change after previous subplan execution:\n{}", NodeUtils.diffString(optimizedPlan, newLogicalPlan));
317+
311318
// look for the next inlinejoin plan
312-
var newSubPlan = firstSubPlan(newLogicalPlan);
319+
var newSubPlan = firstSubPlan(newLogicalPlan, subPlansResults);
313320

314321
if (newSubPlan == null) {// run the final "main" plan
315322
executionInfo.finishSubPlans();
@@ -322,7 +329,16 @@ private void executeSubPlan(
322329
);
323330
}));
324331
} else {// continue executing the subplans
325-
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, releasingNext);
332+
executeSubPlan(
333+
completionInfoAccumulator,
334+
newLogicalPlan,
335+
newSubPlan,
336+
executionInfo,
337+
runner,
338+
request,
339+
subPlansResults,
340+
releasingNext
341+
);
326342
}
327343
} catch (Exception e) {
328344
// safely release the blocks in case an exception occurs either before, but also after the "final" runner.run() forks off

0 commit comments

Comments
 (0)