Skip to content

Commit 945ad9c

Browse files
committed
feat: Introduce the FlowController supervisor
This commit introduces the `FlowController`, a high-throughput, sharded supervisor that orchestrates a pool of stateful `ShardProcessor` workers. This new component is the central processing engine of the Flow Control system, implementing a "supervisor-worker" pattern. Key features of the `FlowController` include: - Supervisor-Worker Architecture: Acts as a stateless supervisor, managing the lifecycle of stateful `ShardProcessor` workers. It includes a reconciliation loop to garbage-collect workers for stale shards. - Flow-Aware Load Balancing: Implements a "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) algorithm to distribute incoming requests to the least-loaded worker, promoting emergent fairness. - Synchronous API: Exposes a blocking `EnqueueAndWait` method, which simplifies client integration (e.g., with Envoy `ext_proc`) and provides direct backpressure. - Lazy Worker Initialization: Workers are created on-demand when a shard shard first becomes active to conserve resources and reduce contention on the hot path. - Configuration: A new `Config` object allows for tuning parameters like TTLs, buffer sizes, and reconciliation intervals.
1 parent 65f52f0 commit 945ad9c

File tree

5 files changed

+1597
-98
lines changed

5 files changed

+1597
-98
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"fmt"
21+
"time"
22+
)
23+
24+
const (
25+
// defaultExpiryCleanupInterval is the default frequency for scanning for expired items.
26+
defaultExpiryCleanupInterval = 1 * time.Second
27+
// defaultProcessorReconciliationInterval is the default frequency for the supervisor loop.
28+
defaultProcessorReconciliationInterval = 5 * time.Second
29+
// defaultEnqueueChannelBufferSize is the default size of a worker's incoming request buffer.
30+
defaultEnqueueChannelBufferSize = 100
31+
)
32+
33+
// Config holds the configuration for the `FlowController`.
34+
type Config struct {
35+
// DefaultRequestTTL is the default Time-To-Live applied to requests that do not
36+
// specify their own TTL hint.
37+
// Optional: If zero, no TTL is applied by default and we rely solely on request context cancellation.
38+
DefaultRequestTTL time.Duration
39+
40+
// ExpiryCleanupInterval is the interval at which each shard processor scans its queues for expired items.
41+
// Optional: Defaults to `defaultExpiryCleanupInterval` (1 second).
42+
ExpiryCleanupInterval time.Duration
43+
44+
// ProcessorReconciliationInterval is the frequency at which the `FlowController`'s supervisor loop garbage collects
45+
// stale workers.
46+
// Optional: Defaults to `defaultProcessorReconciliationInterval` (5 seconds).
47+
ProcessorReconciliationInterval time.Duration
48+
49+
// EnqueueChannelBufferSize is the size of the buffered channel that accepts incoming requests for each shard
50+
// processor. This buffer acts as a shock absorber, decoupling the high-frequency distributor from the processor's
51+
// serial execution loop and allowing the system to handle short bursts of traffic without blocking.
52+
// Optional: Defaults to `defaultEnqueueChannelBufferSize` (100).
53+
EnqueueChannelBufferSize int
54+
}
55+
56+
// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
57+
// This is the required constructor for creating a new configuration.
58+
// It does not mutate the input `cfg`.
59+
func newConfig(cfg Config) (*Config, error) {
60+
newCfg := cfg.deepCopy()
61+
if err := newCfg.validateAndApplyDefaults(); err != nil {
62+
return nil, err
63+
}
64+
return newCfg, nil
65+
}
66+
67+
// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any
68+
// empty fields with system defaults.
69+
func (c *Config) validateAndApplyDefaults() error {
70+
// --- Validation ---
71+
if c.DefaultRequestTTL < 0 {
72+
return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL)
73+
}
74+
if c.ExpiryCleanupInterval < 0 {
75+
return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval)
76+
}
77+
if c.ProcessorReconciliationInterval < 0 {
78+
return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
79+
c.ProcessorReconciliationInterval)
80+
}
81+
if c.EnqueueChannelBufferSize < 0 {
82+
return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize)
83+
}
84+
85+
// --- Defaulting ---
86+
if c.ExpiryCleanupInterval == 0 {
87+
c.ExpiryCleanupInterval = defaultExpiryCleanupInterval
88+
}
89+
if c.ProcessorReconciliationInterval == 0 {
90+
c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
91+
}
92+
if c.EnqueueChannelBufferSize == 0 {
93+
c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
94+
}
95+
return nil
96+
}
97+
98+
// deepCopy creates a deep copy of the `Config` object.
99+
func (c *Config) deepCopy() *Config {
100+
if c == nil {
101+
return nil
102+
}
103+
newCfg := &Config{
104+
DefaultRequestTTL: c.DefaultRequestTTL,
105+
ExpiryCleanupInterval: c.ExpiryCleanupInterval,
106+
ProcessorReconciliationInterval: c.ProcessorReconciliationInterval,
107+
EnqueueChannelBufferSize: c.EnqueueChannelBufferSize,
108+
}
109+
return newCfg
110+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUTHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestNewConfig(t *testing.T) {
28+
t.Parallel()
29+
30+
testCases := []struct {
31+
name string
32+
input Config
33+
expectErr bool
34+
expectedCfg Config
35+
shouldDefault bool
36+
}{
37+
{
38+
name: "ValidConfig_NoChanges",
39+
input: Config{
40+
DefaultRequestTTL: 10 * time.Second,
41+
ExpiryCleanupInterval: 2 * time.Second,
42+
ProcessorReconciliationInterval: 10 * time.Second,
43+
EnqueueChannelBufferSize: 200,
44+
},
45+
expectErr: false,
46+
expectedCfg: Config{
47+
DefaultRequestTTL: 10 * time.Second,
48+
ExpiryCleanupInterval: 2 * time.Second,
49+
ProcessorReconciliationInterval: 10 * time.Second,
50+
EnqueueChannelBufferSize: 200,
51+
},
52+
},
53+
{
54+
name: "EmptyConfig_ShouldApplyDefaults",
55+
input: Config{},
56+
expectErr: false,
57+
expectedCfg: Config{
58+
DefaultRequestTTL: 0,
59+
ExpiryCleanupInterval: defaultExpiryCleanupInterval,
60+
ProcessorReconciliationInterval: defaultProcessorReconciliationInterval,
61+
EnqueueChannelBufferSize: defaultEnqueueChannelBufferSize,
62+
},
63+
shouldDefault: true,
64+
},
65+
{
66+
name: "NegativeDefaultRequestTTL_Invalid",
67+
input: Config{DefaultRequestTTL: -1},
68+
expectErr: true,
69+
},
70+
{
71+
name: "NegativeExpiryCleanupInterval_Invalid",
72+
input: Config{ExpiryCleanupInterval: -1},
73+
expectErr: true,
74+
},
75+
{
76+
name: "NegativeProcessorReconciliationInterval_Invalid",
77+
input: Config{ProcessorReconciliationInterval: -1},
78+
expectErr: true,
79+
},
80+
{
81+
name: "NegativeEnqueueChannelBufferSize_Invalid",
82+
input: Config{EnqueueChannelBufferSize: -1},
83+
expectErr: true,
84+
},
85+
}
86+
87+
for _, tc := range testCases {
88+
t.Run(tc.name, func(t *testing.T) {
89+
t.Parallel()
90+
originalInput := tc.input.deepCopy()
91+
validatedCfg, err := newConfig(tc.input)
92+
93+
if tc.expectErr {
94+
require.Error(t, err, "expected an error but got nil")
95+
assert.Nil(t, validatedCfg, "validatedCfg should be nil on error")
96+
} else {
97+
require.NoError(t, err, "expected no error but got: %v", err)
98+
require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success")
99+
assert.Equal(t, tc.expectedCfg, *validatedCfg, "validatedCfg should match expected config")
100+
101+
// Ensure the original config is not mutated.
102+
assert.Equal(t, *originalInput, tc.input, "input config should not be mutated")
103+
}
104+
})
105+
}
106+
}
107+
108+
func TestConfig_DeepCopy(t *testing.T) {
109+
t.Parallel()
110+
111+
t.Run("ShouldReturnNil_ForNilReceiver", func(t *testing.T) {
112+
t.Parallel()
113+
var nilConfig *Config
114+
assert.Nil(t, nilConfig.deepCopy(), "Deep copy of a nil config should be nil")
115+
})
116+
117+
t.Run("ShouldCreateIdenticalButSeparateObject", func(t *testing.T) {
118+
t.Parallel()
119+
original := &Config{
120+
DefaultRequestTTL: 1 * time.Second,
121+
ExpiryCleanupInterval: 2 * time.Second,
122+
ProcessorReconciliationInterval: 3 * time.Second,
123+
EnqueueChannelBufferSize: 4,
124+
}
125+
clone := original.deepCopy()
126+
127+
require.NotSame(t, original, clone, "Clone should be a new object in memory")
128+
assert.Equal(t, *original, *clone, "Cloned object should have identical values")
129+
130+
// Modify the clone and ensure the original is unchanged.
131+
clone.DefaultRequestTTL = 99 * time.Second
132+
assert.NotEqual(t, original.DefaultRequestTTL, clone.DefaultRequestTTL,
133+
"Original should not be mutated after clone is changed")
134+
})
135+
}

0 commit comments

Comments
 (0)