diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 7c6993ddb..46775b221 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "sync" "github.com/cespare/xxhash/v2" k8stypes "k8s.io/apimachinery/pkg/types" @@ -78,6 +79,7 @@ type Plugin struct { config Config pluginState *plugins.PluginState indexer Indexer + wg sync.WaitGroup } // podSet holds an pods servers that may have a specific prefix hash. @@ -219,8 +221,11 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche // This function is just adding data, it does not need to block other operations. // TODO: look into making this entire function async, none of this needs to be done in-band // The PR that introduces this change is meant as a cherrypick, so it was minimally invasive. + // WaitGroup is added to the Plugin struct to allow waiting in tests. + p.wg.Add(1) go func() { p.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName)) + p.wg.Done() }() total := len(state.PrefixHashes) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go index d6ec43cbb..3fbac2ce1 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go @@ -71,6 +71,7 @@ func TestPrefixPlugin(t *testing.T) { }, } plugin.PreRequest(context.Background(), req1, schedulingResult, 0) + plugin.wg.Wait() // Second request doesn't share any prefix with first one. It should be added to the cache but // the pod score should be 0. @@ -98,6 +99,7 @@ func TestPrefixPlugin(t *testing.T) { }, } plugin.PreRequest(context.Background(), req2, schedulingResult, 0) + plugin.wg.Wait() // Third request shares partial prefix with first one. req3 := &types.LLMRequest{ @@ -123,6 +125,7 @@ func TestPrefixPlugin(t *testing.T) { }, } plugin.PreRequest(context.Background(), req3, schedulingResult, 0) + plugin.wg.Wait() // 4th request is same as req3 except the model is different, still no match. req4 := &types.LLMRequest{ @@ -148,6 +151,7 @@ func TestPrefixPlugin(t *testing.T) { }, } plugin.PreRequest(context.Background(), req4, schedulingResult, 0) + plugin.wg.Wait() // 5th request shares partial prefix with 3rd one. req5 := &types.LLMRequest{ @@ -173,6 +177,7 @@ func TestPrefixPlugin(t *testing.T) { }, } plugin.PreRequest(context.Background(), req5, schedulingResult, 0) + plugin.wg.Wait() } // TestPrefixPluginStress is a stress test for the prefix scoring plugin, using prompts of increasing length. @@ -220,6 +225,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) { }, } plugin.PreRequest(context.Background(), req, schedulingResult, 0) + plugin.wg.Wait() // Second cycle: validate internal state state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req.RequestId, plugins.StateKey(plugin.TypedName().String()))