@@ -29,7 +29,6 @@ import (
2929 "fmt"
3030 "slices"
3131 "sync"
32- "sync/atomic"
3332 "time"
3433
3534 "github.com/go-logr/logr"
@@ -103,44 +102,16 @@ type FlowController struct {
103102 // The key is the shard ID (`string`), and the value is a `*managedWorker`.
104103 workers sync.Map
105104
106- // ready is closed by the Run method once initialization is complete, including setting the `parentCtx`.
107- // This acts as a memory barrier and synchronization point for all other concurrent methods.
108- ready chan struct {}
109-
110- isRunning atomic.Bool
111- wg sync.WaitGroup
105+ wg sync.WaitGroup
112106}
113107
114108// flowControllerOption is a function that applies a configuration change to a `FlowController`.
115109// test-only
116110type flowControllerOption func (* FlowController )
117111
118- // withClock returns a test-only option to inject a clock.
119- // test-only
120- func withClock (c clock.WithTicker ) flowControllerOption {
121- return func (fc * FlowController ) {
122- fc .clock = c
123- }
124- }
125-
126- // withRegistryClient returns a test-only option to inject a mock or fake registry client.
127- // test-only
128- func withRegistryClient (client registryClient ) flowControllerOption {
129- return func (fc * FlowController ) {
130- fc .registry = client
131- }
132- }
133-
134- // withShardProcessorFactory returns a test-only option to inject a processor factory.
135- // test-only
136- func withShardProcessorFactory (factory shardProcessorFactory ) flowControllerOption {
137- return func (fc * FlowController ) {
138- fc .shardProcessorFactory = factory
139- }
140- }
141-
142112// NewFlowController creates a new `FlowController` instance.
143113func NewFlowController (
114+ ctx context.Context ,
144115 config Config ,
145116 registry contracts.FlowRegistry ,
146117 sd contracts.SaturationDetector ,
@@ -158,8 +129,7 @@ func NewFlowController(
158129 saturationDetector : sd ,
159130 clock : clock.RealClock {},
160131 logger : logger .WithName ("flow-controller" ),
161- parentCtx : context .Background (), // Will be set in `Run`
162- ready : make (chan struct {}),
132+ parentCtx : ctx ,
163133 }
164134
165135 // Use the real shard processor implementation by default.
@@ -183,29 +153,21 @@ func NewFlowController(
183153 for _ , opt := range opts {
184154 opt (fc )
185155 }
156+
157+ go fc .run (ctx )
186158 return fc , nil
187159}
188160
189- // Run starts the `FlowController`'s main reconciliation loop.
161+ // run starts the `FlowController`'s main reconciliation loop.
190162// This loop is responsible for garbage collecting workers whose shards no longer exist in the registry.
191163// This method blocks until the provided context is cancelled and ALL worker goroutines have fully terminated.
192- func (fc * FlowController ) Run (ctx context.Context ) {
193- if ! fc .isRunning .CompareAndSwap (false , true ) {
194- fc .logger .Error (nil , "FlowController Run loop already started or controller is shut down" )
195- return
196- }
197-
198- fc .parentCtx = ctx
199- close (fc .ready )
200-
164+ func (fc * FlowController ) run (ctx context.Context ) {
201165 fc .logger .Info ("Starting FlowController reconciliation loop." )
202166 defer fc .logger .Info ("FlowController reconciliation loop stopped." )
203167
204168 ticker := fc .clock .NewTicker (fc .config .ProcessorReconciliationInterval )
205169 defer ticker .Stop ()
206170
207- fc .reconcileProcessors () // Initial reconciliation
208-
209171 for {
210172 select {
211173 case <- ctx .Done ():
@@ -234,7 +196,7 @@ func (fc *FlowController) Run(ctx context.Context) {
234196// backpressure to the caller.
235197func (fc * FlowController ) EnqueueAndWait (req types.FlowControlRequest ) (types.QueueOutcome , error ) {
236198 if req == nil {
237- return types .QueueOutcomeRejectedOther , fmt . Errorf ( "%w: %w" , types . ErrRejected , types . ErrNilRequest )
199+ return types .QueueOutcomeRejectedOther , errors . New ( "request cannot be nil" )
238200 }
239201 effectiveTTL := req .InitialEffectiveTTL ()
240202 if effectiveTTL <= 0 {
@@ -243,18 +205,23 @@ func (fc *FlowController) EnqueueAndWait(req types.FlowControlRequest) (types.Qu
243205 enqueueTime := fc .clock .Now ()
244206
245207 for {
246- if ! fc .isRunning .Load () {
208+ select {
209+ case <- fc .parentCtx .Done ():
247210 return types .QueueOutcomeRejectedOther , fmt .Errorf ("%w: %w" , types .ErrRejected , types .ErrFlowControllerNotRunning )
211+ default :
212+ // The controller is running, proceed.
248213 }
249214
250215 // We must create a fresh `FlowItem` on each attempt since finalization is idempotent.
251- // However, it we use the original, preserved `enqueueTime`.
216+ // However, we use the original, preserved `enqueueTime`.
252217 item := internal .NewItem (req , effectiveTTL , enqueueTime )
253218 if outcome , err := fc .distributeRequest (item ); err != nil {
254219 return outcome , fmt .Errorf ("%w: %w" , types .ErrRejected , err )
255220 }
256221
257- finalState := <- item .Done () // finalization handles monitoring request context cancellation and TTL expiry
222+ // Block until the request is finalized (dispatched, rejected, or evicted).
223+ // The finalization logic internally monitors for context cancellation and TTL expiry.
224+ finalState := <- item .Done ()
258225 if errors .Is (finalState .Err , contracts .ErrShardDraining ) {
259226 fc .logger .V (logutil .DEBUG ).Info ("Shard is draining, retrying request" , "requestID" , req .ID ())
260227 // Benign race with the chosen `contracts.RegistryShard` becoming Draining post selection but before the item was
@@ -266,65 +233,16 @@ func (fc *FlowController) EnqueueAndWait(req types.FlowControlRequest) (types.Qu
266233 }
267234}
268235
269- // distributeRequest selects the optimal shard processor for a given item and attempts to submit it.
270- //
271- // # Architectural Deep Dive: Achieving Emergent Fairness with Data Parallelism
272- //
273- // To achieve high throughput and prevent a central bottleneck, the `FlowController` is built on a sharded,
274- // data-parallel architecture. It runs multiple `internal`ShardProcessor` workers, and every logical flow is represented
275- // by a dedicated queue on every Active shard. This design grants the distributor maximum flexibility to route traffic
276- // based on real-time load.
277- //
278- // This function implements a sophisticated distribution strategy: Flow-Aware, Two-Phase Join-Shortest-Queue-by-Bytes
279- // (JSQ-Bytes) with Graceful Backpressure. It is designed to balance load, prevent unnecessary rejections under
280- // transient spikes, and create the necessary conditions for global fairness goals to emerge from local, independent
281- // worker actions.
282- //
283- // # The Algorithm in Detail
284- //
285- // 1. Flow-Aware Candidate Selection: For an incoming request, the controller inspects the queue depth (in bytes) for
286- // that specific flow across all Active shards. It then sorts these shards from least-loaded to most-loaded,
287- // creating a ranked list of candidates.
288- // 2. Phase 1 (Non-blocking Fast Failover): The controller iterates through the sorted candidates and attempts a
289- // non-blocking `Submit()` to each. If any processor accepts the item, the operation succeeds immediately.
290- // This prevents a single, momentarily busy worker from stalling the entire system.
291- // 3. Phase 2 (Blocking Fallback): If all processors are busy, it attempts a single, blocking `SubmitOrBlock()` on the
292- // single best candidate. This provides graceful backpressure and increases the likelihood of success during
293- // transient traffic bursts.
236+ // distributeRequest implements a flow-aware, two-phase "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) distribution strategy
237+ // with graceful backpressure. It selects the optimal worker for a given item and attempts to submit it.
294238//
295- // # Design Rationale and Critical Assumptions
296- //
297- // ### 1. The Flow Homogeneity Assumption
298- //
299- // The first assumption is that traffic within a single logical flow is roughly homogeneous. The `types.FlowKey` is
300- // the primary mechanism for grouping requests that are expected to have statistically similar behavior (e.g., prefill,
301- // decode). For this to be effective, a flow must meaningfully represent a single workload (e.g., the same model, user
302- // cohort, or task type). The more closely this assumption is satisfied in practice, the more stable and predictable the
303- // system dynamics will be.
304- //
305- // ### Robustness Through Real-Time Adaptation
306- //
307- // The system is designed to be robust even when the homogeneity assumption is imperfect. The distribution algorithm
308- // does not need to predict workload characteristics; it only needs to react to their consequences in real time.
309- // If a shard becomes slow or congested, the backlogs of its queues will grow. The JSQ-Bytes algorithm will naturally
310- // observe this increase in byte size and adaptively steer new work away from the congested shard.
311- //
312- // ### 2. The Shard Homogeneity Constraint (Enabling Stateful Policies)
313- //
314- // The second, and most critical, constraint of this data-parallel design relates to the policies executed by the
315- // workers. The fairness (`InterFlowDispatchPolicy`) and temporal scheduling (`IntraFlowDispatchPolicy`) policies may be
316- // stateful (e.g., a fairness algorithm tracking historical tokens served).
317- //
318- // For the independent decisions of these stateful policies to result in coherent, globally fair outcomes, the state
319- // they observe on each shard must be statistically similar. This is the shard homogeneity constraint.
320- //
321- // This constraint is actively enforced by the Flow-Aware JSQ-Bytes algorithm. By constantly balancing the load for each
322- // flow individually, the distributor ensures that, over time, the mix of traffic on each shard is roughly proportional.
323- // It actively prevents one shard from becoming specialized in serving a single, dominant flow.
324- //
325- // This creates the necessary foundation for our model: local, stateful policy decisions, when aggregated across
326- // statistically similar shards, result in an emergent, approximately correct global fairness objective. This is key to
327- // unlocking scalable performance without a central, bottlenecked scheduler.
239+ // The algorithm operates as follows:
240+ // 1. Candidate Selection: It identifies all Active shards for the item's flow and ranks them by the current byte size
241+ // of that flow's queue, from least to most loaded.
242+ // 2. Phase 1 (Non-blocking Fast Failover): It iterates through the ranked candidates and attempts a non-blocking
243+ // submission. The first successful submission wins.
244+ // 3. Phase 2 (Blocking Fallback): If all non-blocking attempts fail, it performs a single blocking submission to the
245+ // least-loaded candidate, providing backpressure.
328246func (fc * FlowController ) distributeRequest (item * internal.FlowItem ) (types.QueueOutcome , error ) {
329247 key := item .OriginalRequest ().FlowKey ()
330248 reqID := item .OriginalRequest ().ID ()
@@ -349,7 +267,7 @@ func (fc *FlowController) distributeRequest(item *internal.FlowItem) (types.Queu
349267 return nil
350268 })
351269 if err != nil {
352- return types .QueueOutcomeRejectedOther , fmt .Errorf ("failed to establish connection for request %q (flow %s): %w" ,
270+ return types .QueueOutcomeRejectedOther , fmt .Errorf ("failed to acquire lease for request %q (flow %s): %w" ,
353271 reqID , key , err )
354272 }
355273
@@ -387,30 +305,15 @@ func (fc *FlowController) distributeRequest(item *internal.FlowItem) (types.Queu
387305}
388306
389307// getOrStartWorker implements the lazy-loading and startup of shard processors.
390- // It attempts to retrieve an existing worker for a shard, and if one doesn't exist, it creates, starts, and
391- // registers it atomically.
392- // This ensures that workers are only created on-demand when a shard first becomes Active .
308+ // It attempts to retrieve an existing worker for a shard. If one doesn't exist, it constructs a new worker and attempts
309+ // to register it atomically. The worker's processor goroutine is only started *after* it has successfully been
310+ // registered, preventing race conditions where multiple goroutines create and start the same worker .
393311func (fc * FlowController ) getOrStartWorker (shard contracts.RegistryShard ) * managedWorker {
394312 if w , ok := fc .workers .Load (shard .ID ()); ok {
395313 return w .(* managedWorker )
396314 }
397315
398- // Atomically load or store.
399- // This handles the race condition where multiple goroutines try to create the same worker.
400- newWorker := fc .startNewWorker (shard )
401- actual , loaded := fc .workers .LoadOrStore (shard .ID (), newWorker )
402- if loaded {
403- // Another goroutine beat us to it; the `newWorker` we created was not stored.
404- // We must clean it up immediately to prevent resource leaks.
405- newWorker .cancel ()
406- return actual .(* managedWorker )
407- }
408- return newWorker
409- }
410-
411- // startNewWorker encapsulates the logic for creating and starting a new worker goroutine.
412- func (fc * FlowController ) startNewWorker (shard contracts.RegistryShard ) * managedWorker {
413- <- fc .ready // We must wait until the parent context is initialized.
316+ // Construct a new worker, but do not start its processor goroutine yet.
414317 processorCtx , cancel := context .WithCancel (fc .parentCtx )
415318 dispatchFilter := internal .NewSaturationFilter (fc .saturationDetector )
416319 processor := fc .shardProcessorFactory (
@@ -421,19 +324,30 @@ func (fc *FlowController) startNewWorker(shard contracts.RegistryShard) *managed
421324 fc .config .EnqueueChannelBufferSize ,
422325 fc .logger .WithValues ("shardID" , shard .ID ()),
423326 )
424-
425- worker := & managedWorker {
327+ newWorker := & managedWorker {
426328 processor : processor ,
427329 cancel : cancel ,
428330 }
429331
332+ // Atomically load or store. This is the critical step for preventing race conditions.
333+ actual , loaded := fc .workers .LoadOrStore (shard .ID (), newWorker )
334+ if loaded {
335+ // Another goroutine beat us to it. The `newWorker` we created was not stored.
336+ // We must cancel the context we created for it to prevent a leak, but we do not need to do anything else, as its
337+ // processor was never started.
338+ cancel ()
339+ return actual .(* managedWorker )
340+ }
341+
342+ // We won the race. The `newWorker` was successfully stored.
343+ // Now, and only now, do we start the processor's long-running goroutine.
430344 fc .wg .Add (1 )
431345 go func () {
432346 defer fc .wg .Done ()
433347 processor .Run (processorCtx )
434348 }()
435349
436- return worker
350+ return newWorker
437351}
438352
439353// reconcileProcessors is the supervisor's core garbage collection loop.
@@ -463,7 +377,6 @@ func (fc *FlowController) reconcileProcessors() {
463377// shutdown gracefully terminates all running `shardProcessor` goroutines.
464378// It signals all workers to stop and waits for them to complete their shutdown procedures.
465379func (fc * FlowController ) shutdown () {
466- fc .isRunning .Store (false )
467380 fc .logger .Info ("Shutting down FlowController and all shard processors." )
468381 fc .workers .Range (func (key , value any ) bool {
469382 shardID := key .(string )
0 commit comments