Skip to content

Commit 02fd607

Browse files
committed
logical plan fragmentation implementation
Signed-off-by: rubywtl <[email protected]>
1 parent eab2099 commit 02fd607

File tree

8 files changed

+208
-101
lines changed

8 files changed

+208
-101
lines changed

pkg/distributed_execution/codec_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestUnmarshalWithLogicalPlan(t *testing.T) {
3434

3535
for _, tc := range testCases {
3636
t.Run(tc.name, func(t *testing.T) {
37-
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
37+
plan, err := CreateTestLogicalPlan(tc.query, start, end, step)
3838
require.NoError(t, err)
3939
require.NotNil(t, plan)
4040

pkg/distributed_execution/distributed_optimizer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package distributed_execution
22

33
import (
4-
"github.com/thanos-io/promql-engine/query"
5-
64
"github.com/prometheus/prometheus/util/annotations"
75
"github.com/thanos-io/promql-engine/logicalplan"
6+
"github.com/thanos-io/promql-engine/query"
87
)
98

109
// This is a simplified implementation that only handles binary aggregation cases
@@ -18,6 +17,7 @@ type DistributedOptimizer struct{}
1817
func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) {
1918
warns := annotations.New()
2019

20+
// insert remote nodes
2121
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
2222

2323
if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) {

pkg/distributed_execution/distributed_optimizer_test.go

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ import (
44
"testing"
55
"time"
66

7-
"github.com/prometheus/prometheus/promql/parser"
87
"github.com/stretchr/testify/require"
98
"github.com/thanos-io/promql-engine/logicalplan"
10-
"github.com/thanos-io/promql-engine/query"
119
)
1210

1311
func TestDistributedOptimizer(t *testing.T) {
@@ -58,7 +56,7 @@ func TestDistributedOptimizer(t *testing.T) {
5856

5957
for _, tc := range testCases {
6058
t.Run(tc.name, func(t *testing.T) {
61-
lp, _, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
59+
lp, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
6260
require.NoError(t, err)
6361

6462
node := (*lp).Root()
@@ -75,49 +73,3 @@ func TestDistributedOptimizer(t *testing.T) {
7573
})
7674
}
7775
}
78-
79-
func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
80-
if step == 0 {
81-
return start, start
82-
}
83-
return start, end
84-
}
85-
86-
func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) {
87-
88-
start, end = getStartAndEnd(start, end, step)
89-
90-
qOpts := query.Options{
91-
Start: start,
92-
End: end,
93-
Step: step,
94-
StepsBatch: 10,
95-
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
96-
return 0
97-
},
98-
LookbackDelta: 0,
99-
EnablePerStepStats: false,
100-
}
101-
102-
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
103-
if err != nil {
104-
return nil, qOpts, err
105-
}
106-
107-
planOpts := logicalplan.PlanOptions{
108-
DisableDuplicateLabelCheck: false,
109-
}
110-
111-
logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts)
112-
if err != nil {
113-
return nil, qOpts, err
114-
}
115-
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
116-
117-
distributedOptimizer := DistributedOptimizer{}
118-
dOptimizedNode, _ := distributedOptimizer.Optimize(optimizedPlan.Root(), &qOpts)
119-
120-
plan := logicalplan.New(dOptimizedNode, &qOpts, planOpts)
121-
122-
return &plan, qOpts, nil
123-
}

pkg/distributed_execution/plan_fragments/fragmenter.go

Lines changed: 88 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,103 @@
11
package plan_fragments
22

3-
import "github.com/thanos-io/promql-engine/logicalplan"
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/google/uuid"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
9+
"github.com/cortexproject/cortex/pkg/distributed_execution"
10+
)
411

512
// Fragmenter interface
613
type Fragmenter interface {
714
// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root
815
// in other words, the order of the fragment in the array will be the order they are being scheduled
9-
Fragment(node logicalplan.Node) ([]Fragment, error)
16+
Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error)
1017
}
1118

12-
type DummyFragmenter struct {
19+
func getNewID() uint64 {
20+
id := uuid.New()
21+
return binary.BigEndian.Uint64(id[:8])
1322
}
1423

15-
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
16-
// simple logic without distributed optimizer
17-
return []Fragment{
18-
{
24+
type PlanFragmenter struct {
25+
}
26+
27+
func (f *PlanFragmenter) Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error) {
28+
fragments := []Fragment{}
29+
30+
nodeToFragmentID := make(map[*logicalplan.Node]uint64)
31+
nodeToSubtreeFragmentIDs := make(map[*logicalplan.Node][]uint64)
32+
33+
logicalplan.TraverseBottomUp(nil, &node, func(parent, current *logicalplan.Node) bool {
34+
childFragmentIDs := make(map[uint64]bool)
35+
children := (*current).Children()
36+
37+
for _, child := range children {
38+
if subtreeIDs, exists := nodeToSubtreeFragmentIDs[child]; exists {
39+
for _, fragmentID := range subtreeIDs {
40+
childFragmentIDs[fragmentID] = true
41+
}
42+
}
43+
}
44+
45+
childIDs := make([]uint64, 0, len(childFragmentIDs))
46+
for fragmentID := range childFragmentIDs {
47+
childIDs = append(childIDs, fragmentID)
48+
}
49+
50+
if parent == nil { // root fragment
51+
newFragment := Fragment{
52+
Node: *current,
53+
FragmentID: getNewID(),
54+
ChildIDs: childIDs,
55+
IsRoot: true,
56+
}
57+
fragments = append(fragments, newFragment)
58+
59+
// cache subtree fragment IDs for this node
60+
nodeToSubtreeFragmentIDs[current] = childIDs
61+
62+
} else if distributed_execution.RemoteNode == (*current).Type() {
63+
remoteNode := (*current).(*distributed_execution.Remote)
64+
fragmentID := getNewID()
65+
nodeToFragmentID[current] = fragmentID
66+
67+
// Set the fragment key for the remote node
68+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
69+
remoteNode.FragmentKey = key
70+
71+
newFragment := Fragment{
72+
Node: remoteNode.Expr,
73+
FragmentID: fragmentID,
74+
ChildIDs: childIDs,
75+
IsRoot: false,
76+
}
77+
78+
fragments = append(fragments, newFragment)
79+
80+
subtreeIDs := append([]uint64{fragmentID}, childIDs...)
81+
nodeToSubtreeFragmentIDs[current] = subtreeIDs
82+
} else {
83+
nodeToSubtreeFragmentIDs[current] = childIDs
84+
}
85+
86+
return false
87+
})
88+
89+
if len(fragments) > 0 {
90+
return fragments, nil
91+
} else {
92+
// for non-query API calls
93+
// --> treat as root fragment and immediately return the result
94+
return []Fragment{{
1995
Node: node,
20-
FragmentID: uint64(1),
96+
FragmentID: uint64(0),
2197
ChildIDs: []uint64{},
2298
IsRoot: true,
23-
},
24-
}, nil
99+
}}, nil
100+
}
25101
}
26102

27103
type Fragment struct {
@@ -47,6 +123,6 @@ func (s *Fragment) IsEmpty() bool {
47123
return true
48124
}
49125

50-
func NewDummyFragmenter() Fragmenter {
51-
return &DummyFragmenter{}
126+
func NewPlanFragmenter() Fragmenter {
127+
return &PlanFragmenter{}
52128
}

pkg/distributed_execution/plan_fragments/fragmenter_test.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import (
66

77
"github.com/stretchr/testify/require"
88

9-
"github.com/cortexproject/cortex/pkg/util/logical_plan"
9+
"github.com/cortexproject/cortex/pkg/distributed_execution"
1010
)
1111

12+
// Tests fragmentation of logical plans, verifying that the fragments contain correct metadata.
13+
// Note: The number of fragments is determined by the distributed optimizer's strategy -
14+
// if the optimizer logic changes, this test will need to be updated accordingly.
1215
func TestFragmenter(t *testing.T) {
1316
type testCase struct {
1417
name string
@@ -19,8 +22,6 @@ func TestFragmenter(t *testing.T) {
1922
}
2023

2124
now := time.Now()
22-
23-
// more tests will be added when distributed optimizer and fragmenter are implemented
2425
tests := []testCase{
2526
{
2627
name: "simple logical query plan - no fragmentation",
@@ -29,18 +30,37 @@ func TestFragmenter(t *testing.T) {
2930
end: now,
3031
expectedFragments: 1,
3132
},
33+
{
34+
name: "binary operation with aggregations",
35+
query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))",
36+
start: now,
37+
end: now,
38+
expectedFragments: 3,
39+
},
40+
{
41+
name: "multiple binary operation with aggregations",
42+
query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) + sum(rate(http_requests_total{job=\"cache\"}[5m])) + sum(rate(http_requests_total{job=\"db\"}[5m]))",
43+
start: now,
44+
end: now,
45+
expectedFragments: 7,
46+
},
3247
}
3348

3449
for _, tc := range tests {
3550
t.Run(tc.name, func(t *testing.T) {
36-
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
51+
lp, err := distributed_execution.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
3752
require.NoError(t, err)
3853

39-
fragmenter := NewDummyFragmenter()
40-
res, err := fragmenter.Fragment((*lp).Root())
54+
fragmenter := NewPlanFragmenter()
55+
res, err := fragmenter.Fragment(uint64(1), (*lp).Root())
4156

4257
require.NoError(t, err)
4358
require.Equal(t, tc.expectedFragments, len(res))
59+
60+
// check the metadata of the fragments of binary expressions
61+
if len(res) == 3 {
62+
require.Equal(t, []uint64{res[0].FragmentID, res[1].FragmentID}, res[2].ChildIDs)
63+
}
4464
})
4565
}
4666
}

pkg/util/logical_plan/test_logicalplan_utils.go renamed to pkg/distributed_execution/test_logicalplan_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package logical_plan
1+
package distributed_execution
22

33
import (
44
"time"
@@ -44,7 +44,7 @@ func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.
4444
if err != nil {
4545
return nil, err
4646
}
47-
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
47+
optimizedPlan, _ := logicalPlan.Optimize(append(logicalplan.DefaultOptimizers, &DistributedOptimizer{}))
4848

4949
return &optimizedPlan, nil
5050
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
125125
connectedFrontends: map[string]*connectedFrontend{},
126126

127127
fragmentTable: fragment_table.NewFragmentTable(2 * time.Minute),
128-
fragmenter: plan_fragments.NewDummyFragmenter(),
128+
fragmenter: plan_fragments.NewPlanFragmenter(),
129129
distributedExecEnabled: distributedExecEnabled,
130130
queryFragmentRegistry: map[queryKey][]uint64{},
131131
}
@@ -351,7 +351,7 @@ func (s *Scheduler) fragmentAndEnqueueRequest(frontendContext context.Context, f
351351
return err
352352
}
353353

354-
fragments, err := s.fragmenter.Fragment(lpNode)
354+
fragments, err := s.fragmenter.Fragment(msg.QueryID, lpNode)
355355
if err != nil {
356356
return err
357357
}

0 commit comments

Comments
 (0)