Skip to content

Commit 5740f8e

Browse files
committed
Introduce pluggable intra-flow dispatch framework
This commit introduces the `IntraFlowDispatchPolicy` framework, the second major component of the new pluggable flow control system. This framework decouples the logic for selecting a request from within a single flow's queue (temporal scheduling) from the underlying queue data structure. Key components include: - `framework.IntraFlowDispatchPolicy`: The core interface that defines the contract for selecting an item from a flow's queue. - `framework.FlowQueueAccessor`: A read-only interface that provides policies with safe access to queue state. - `RequiredQueueCapabilities`: A mechanism for policies to declare their queue requirements (e.g., FIFO, priority-ordered), which are validated by the registry. - A factory and registration system for discovering and instantiating policy plugins by name. - A comprehensive conformance test suite to validate the contract for all policy plugins. - A foundational `FCFS` (First-Come, First-Served) policy as the first reference implementation. This work builds directly on the `SafeQueue` framework, enabling the development of sophisticated, policy-driven request prioritization and scheduling.
1 parent f486883 commit 5740f8e

File tree

11 files changed

+556
-25
lines changed

11 files changed

+556
-25
lines changed

pkg/epp/flowcontrol/framework/doc.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ limitations under the License.
2020
// to. By building on these interfaces, the Flow Control system can be extended and customized without modifying the
2121
// core controller logic.
2222
//
23-
// The primary interfaces defined here are:
24-
// - `SafeQueue`: The contract for concurrent-safe queue implementations.
25-
// - `ItemComparator`: The contract for policy-driven logic that defines the relative priority of items within a
23+
// The primary contracts are:
24+
// - `SafeQueue`: An interface for concurrent-safe queue implementations.
25+
// - `IntraFlowDispatchPolicy`: An interface for policies that decide which item to select from within a single flow's
2626
// queue.
27+
// - `ItemComparator`: An interface vended by policies to make their internal item-ordering logic explicit and
28+
// available to other components.
29+
//
30+
// These components are linked by `QueueCapability`, which allows policies to declare their queue requirements (e.g.,
31+
// FIFO or priority-based ordering).
2732
package framework

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package mocks
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2122
)
2223

2324
// MockItemComparator provides a mock implementation of the `framework.ItemComparator` interface.
@@ -30,3 +31,35 @@ func (m *MockItemComparator) Func() framework.ItemComparatorFunc { return m.Func
3031
func (m *MockItemComparator) ScoreType() string { return m.ScoreTypeV }
3132

3233
var _ framework.ItemComparator = &MockItemComparator{}
34+
35+
// MockFlowQueueAccessor is a mock implementation of the `framework.FlowQueueAccessor` interface.
36+
type MockFlowQueueAccessor struct {
37+
NameV string
38+
CapabilitiesV []framework.QueueCapability
39+
LenV int
40+
ByteSizeV uint64
41+
PeekHeadV types.QueueItemAccessor
42+
PeekHeadErrV error
43+
PeekTailV types.QueueItemAccessor
44+
PeekTailErrV error
45+
FlowSpecV types.FlowSpecification
46+
ComparatorV framework.ItemComparator
47+
}
48+
49+
func (m *MockFlowQueueAccessor) Name() string { return m.NameV }
50+
func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
51+
func (m *MockFlowQueueAccessor) Len() int { return m.LenV }
52+
func (m *MockFlowQueueAccessor) ByteSize() uint64 { return m.ByteSizeV }
53+
54+
func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) {
55+
return m.PeekHeadV, m.PeekHeadErrV
56+
}
57+
58+
func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) {
59+
return m.PeekTailV, m.PeekTailErrV
60+
}
61+
62+
func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m.ComparatorV }
63+
func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
64+
65+
var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Flow Controller Intra-Flow Dispatch Policy Plugins
2+
3+
This directory contains concrete implementations of the [`framework.IntraFlowDispatchPolicy`](../../../policies.go)
4+
interface. These policies are responsible for **temporal scheduling**: determining the order in which requests are
5+
selected for dispatch *from within a single flow's queue*.
6+
7+
## Overview
8+
9+
The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.IntraFlowDispatchPolicy`
10+
plugins represent the first tier, making tactical decisions about the ordering of requests *within* a single logical
11+
flow (e.g., for a specific model or tenant).
12+
13+
This contrasts with the `framework.InterFlowDispatchPolicy` (not yet implemented), which is responsible for
14+
**spatial fairness**: deciding *which flow's queue* gets the next opportunity to dispatch a request. The
15+
`framework.IntraFlowDispatchPolicy` only operates *after* the inter-flow policy has selected a specific queue.
16+
17+
Key responsibilities and characteristics of an `IntraFlowDispatchPolicy`:
18+
19+
1. **Request Selection (`SelectItem`)**: The primary method, `SelectItem(queue framework.FlowQueueAccessor)`, inspects
20+
the given flow's queue (via a read-only accessor) and decides which item, if any, should be dispatched next from
21+
*that specific queue*.
22+
23+
2. **Priority Definition (`ItemComparator`)**:
24+
- This policy type is unique because it defines the nature of priority for items *within its specific managed
25+
queue*. It makes this logic explicit by vending a [`framework.ItemComparator`](../../../policies.go).
26+
- The vended comparator defines the "less than" relationship between two items and exposes a `ScoreType()` string
27+
(e.g., `"enqueue_time_ns_asc"`, `"slo_deadline_urgency"`) that gives a semantic meaning to the comparison.
28+
29+
3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated
30+
[`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS
31+
policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require
32+
`framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with
33+
compatible queues.
34+
35+
The `IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are
36+
serviced, enabling strategies like basic FCFS or more advanced schemes based on SLOs or deadlines.
37+
38+
## Contributing a New `IntraFlowDispatchPolicy` Implementation
39+
40+
To contribute a new dispatch policy implementation, follow these steps:
41+
42+
1. **Define Your Implementation**
43+
- Create a new Go package in a subdirectory (e.g., `mycustompolicy/`).
44+
- Implement the `framework.IntraFlowDispatchPolicy` interface.
45+
- Ensure all methods are goroutine-safe if your policy maintains any internal state.
46+
47+
2. **Register Your Policy**
48+
- In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a
49+
unique name and a constructor function that matches the `PolicyConstructor` signature.
50+
51+
3. **Add to the Conformance Test**
52+
- Add a blank import for your new package to [`conformance_test.go`](./conformance_test.go). Your policy will then
53+
be automatically included in the conformance suite, which validates the basic `framework.IntraFlowDispatchPolicy`
54+
contract (e.g., correct initialization, handling of nil/empty queues).
55+
56+
4. **Add Policy-Specific Tests**
57+
- The conformance suite only validates the universal contract. You MUST add a separate `_test.go` file within your
58+
package to test the specific logic of your policy.
59+
- For example, your tests should validate that your `Comparator()` works as expected and that `SelectItem()`
60+
correctly implements your desired selection logic for a non-empty queue.
61+
62+
5. **Documentation**
63+
- Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dispatch_test
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
26+
frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
28+
29+
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
30+
)
31+
32+
// TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy`
33+
// implementations.
34+
// It iterates over all policy implementations registered via `dispatch.MustRegisterPolicy` and runs a series of
35+
// sub-tests to ensure they adhere to the `framework.IntraFlowDispatchPolicy` contract.
36+
func TestIntraFlowDispatchPolicyConformance(t *testing.T) {
37+
t.Parallel()
38+
39+
for policyName, constructor := range dispatch.RegisteredPolicies {
40+
t.Run(string(policyName), func(t *testing.T) {
41+
t.Parallel()
42+
43+
policy, err := constructor()
44+
require.NoError(t, err, "Policy constructor for %s failed", policyName)
45+
require.NotNil(t, policy, "Constructor for %s should return a non-nil policy instance", policyName)
46+
47+
t.Run("Initialization", func(t *testing.T) {
48+
t.Parallel()
49+
assert.NotEmpty(t, policy.Name(), "Name() for %s should not be empty", policyName)
50+
51+
comp := policy.Comparator()
52+
require.NotNil(t, comp, "Comparator() for %s should not return nil", policyName)
53+
assert.NotNil(t, comp.Func(), "Comparator().Func() for %s should not be nil", policyName)
54+
assert.NotEmpty(t, comp.ScoreType(), "Comparator().ScoreType() for %s should not be empty", policyName)
55+
56+
caps := policy.RequiredQueueCapabilities()
57+
assert.NotNil(t, caps, "RequiredQueueCapabilities() for %s should not return a nil slice", policyName)
58+
})
59+
60+
t.Run("SelectItemFromNilQueue", func(t *testing.T) {
61+
t.Parallel()
62+
item, err := policy.SelectItem(nil)
63+
require.NoError(t, err, "SelectItem(nil) for %s should not return an error", policyName)
64+
assert.Nil(t, item, "SelectItem(nil) for %s should return a nil item", policyName)
65+
})
66+
67+
t.Run("SelectItemFromEmptyQueue", func(t *testing.T) {
68+
t.Parallel()
69+
mockQueue := &frameworkmocks.MockFlowQueueAccessor{
70+
PeekHeadErrV: framework.ErrQueueEmpty,
71+
LenV: 0,
72+
}
73+
item, err := policy.SelectItem(mockQueue)
74+
require.NoError(t, err, "SelectItem from an empty queue for %s should not return an error", policyName)
75+
assert.Nil(t, item, "SelectItem from an empty queue for %s should return a nil item", policyName)
76+
})
77+
})
78+
}
79+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package dispatch provides the factory and registration mechanism for all `IntraFlowDispatchPolicy` implementations.
18+
// It allows new policies to be added to the system and instantiated by name.
19+
package dispatch
20+
21+
import (
22+
"fmt"
23+
"sync"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
26+
)
27+
28+
// RegisteredPolicyName is the unique name under which a policy is registered.
29+
type RegisteredPolicyName string
30+
31+
// PolicyConstructor defines the function signature for creating a `framework.IntraFlowDispatchPolicy`.
32+
type PolicyConstructor func() (framework.IntraFlowDispatchPolicy, error)
33+
34+
var (
35+
// mu guards the registration map.
36+
mu sync.RWMutex
37+
// RegisteredPolicies stores the constructors for all registered policies.
38+
RegisteredPolicies = make(map[RegisteredPolicyName]PolicyConstructor)
39+
)
40+
41+
// MustRegisterPolicy registers a policy constructor, and panics if the name is already registered.
42+
// This is intended to be called from the `init()` function of a policy implementation.
43+
func MustRegisterPolicy(name RegisteredPolicyName, constructor PolicyConstructor) {
44+
mu.Lock()
45+
defer mu.Unlock()
46+
if _, ok := RegisteredPolicies[name]; ok {
47+
panic(fmt.Sprintf("IntraFlowDispatchPolicy already registered with name %q", name))
48+
}
49+
RegisteredPolicies[name] = constructor
50+
}
51+
52+
// NewPolicyFromName creates a new `IntraFlowDispatchPolicy` given its registered name.
53+
// This is called by the `registry.FlowRegistry` when configuring a flow.
54+
func NewPolicyFromName(name RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
55+
mu.RLock()
56+
defer mu.RUnlock()
57+
constructor, ok := RegisteredPolicies[name]
58+
if !ok {
59+
return nil, fmt.Errorf("no IntraFlowDispatchPolicy registered with name %q", name)
60+
}
61+
return constructor()
62+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`.
18+
package fcfs
19+
20+
import (
21+
"errors"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
26+
)
27+
28+
// FCFSPolicyName is the name of the FCFS policy implementation.
29+
const FCFSPolicyName = "FCFS"
30+
31+
func init() {
32+
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName),
33+
func() (framework.IntraFlowDispatchPolicy, error) {
34+
return newFCFS(), nil
35+
})
36+
}
37+
38+
// fcfs (First-Come, First-Served) implements the `framework.IntraFlowDispatchPolicy` interface.
39+
type fcfs struct {
40+
comparator framework.ItemComparator
41+
}
42+
43+
// newFCFS creates a new `fcfs` policy instance.
44+
func newFCFS() *fcfs {
45+
return &fcfs{
46+
comparator: &enqueueTimeComparator{},
47+
}
48+
}
49+
50+
// Name returns the name of the policy.
51+
func (p *fcfs) Name() string {
52+
return FCFSPolicyName
53+
}
54+
55+
// SelectItem selects the next item from the queue by peeking its head. This implementation relies on the queue being
56+
// ordered by dispatch preference, as indicated by its `RequiredQueueCapabilities`.
57+
func (p *fcfs) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
58+
if queue == nil {
59+
return nil, nil
60+
}
61+
item, err := queue.PeekHead()
62+
if errors.Is(err, framework.ErrQueueEmpty) {
63+
return nil, nil
64+
}
65+
return item, err
66+
}
67+
68+
// Comparator returns a `framework.ItemComparator` based on enqueue time.
69+
func (p *fcfs) Comparator() framework.ItemComparator {
70+
return p.comparator
71+
}
72+
73+
// RequiredQueueCapabilities specifies that this policy needs a queue that supports FIFO operations.
74+
func (p *fcfs) RequiredQueueCapabilities() []framework.QueueCapability {
75+
return []framework.QueueCapability{framework.CapabilityFIFO}
76+
}
77+
78+
// --- enqueueTimeComparator ---
79+
80+
// enqueueTimeComparator implements `framework.ItemComparator` for FCFS logic.
81+
// It prioritizes items with earlier enqueue times.
82+
type enqueueTimeComparator struct{}
83+
84+
// Func returns the comparison logic.
85+
// It returns true if item 'a' should be dispatched before item 'b'.
86+
func (c *enqueueTimeComparator) Func() framework.ItemComparatorFunc {
87+
return func(a, b types.QueueItemAccessor) bool {
88+
if a == nil && b == nil {
89+
return false
90+
}
91+
if a == nil { // Treat nil as lowest priority
92+
return false
93+
}
94+
if b == nil { // Treat non-nil 'a' as higher priority than nil 'b'
95+
return true
96+
}
97+
return a.EnqueueTime().Before(b.EnqueueTime())
98+
}
99+
}
100+
101+
// ScoreType returns a string descriptor for the comparison logic.
102+
func (c *enqueueTimeComparator) ScoreType() string {
103+
return string(framework.EnqueueTimePriorityScoreType)
104+
}

0 commit comments

Comments
 (0)