Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"sync"

"github.com/cespare/xxhash/v2"
k8stypes "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Plugin struct {
config Config
pluginState *plugins.PluginState
indexer Indexer
wg sync.WaitGroup
}

// podSet holds an pods servers that may have a specific prefix hash.
Expand Down Expand Up @@ -219,8 +221,11 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche
// This function is just adding data, it does not need to block other operations.
// TODO: look into making this entire function async, none of this needs to be done in-band
// The PR that introduces this change is meant as a cherrypick, so it was minimally invasive.
// WaitGroup is added to the Plugin struct to allow waiting in tests.
p.wg.Add(1)
go func() {
p.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
p.wg.Done()
Comment on lines +225 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmmmm.
this fix introduces a wg only for test purpose. the thing is we don't have access to this wg outside of prefix plugin package, so llm-d tests will fail (we test PD, where prefix is used and profile handler uses it, reads state from CycleState, but with this go routine the update is not ready on time.
it solves the issue only in prefix unit tests

}()

total := len(state.PrefixHashes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestPrefixPlugin(t *testing.T) {
},
}
plugin.PreRequest(context.Background(), req1, schedulingResult, 0)
plugin.wg.Wait()

// Second request doesn't share any prefix with first one. It should be added to the cache but
// the pod score should be 0.
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestPrefixPlugin(t *testing.T) {
},
}
plugin.PreRequest(context.Background(), req2, schedulingResult, 0)
plugin.wg.Wait()

// Third request shares partial prefix with first one.
req3 := &types.LLMRequest{
Expand All @@ -123,6 +125,7 @@ func TestPrefixPlugin(t *testing.T) {
},
}
plugin.PreRequest(context.Background(), req3, schedulingResult, 0)
plugin.wg.Wait()

// 4th request is same as req3 except the model is different, still no match.
req4 := &types.LLMRequest{
Expand All @@ -148,6 +151,7 @@ func TestPrefixPlugin(t *testing.T) {
},
}
plugin.PreRequest(context.Background(), req4, schedulingResult, 0)
plugin.wg.Wait()

// 5th request shares partial prefix with 3rd one.
req5 := &types.LLMRequest{
Expand All @@ -173,6 +177,7 @@ func TestPrefixPlugin(t *testing.T) {
},
}
plugin.PreRequest(context.Background(), req5, schedulingResult, 0)
plugin.wg.Wait()
}

// TestPrefixPluginStress is a stress test for the prefix scoring plugin, using prompts of increasing length.
Expand Down Expand Up @@ -220,6 +225,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) {
},
}
plugin.PreRequest(context.Background(), req, schedulingResult, 0)
plugin.wg.Wait()

// Second cycle: validate internal state
state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req.RequestId, plugins.StateKey(plugin.TypedName().String()))
Expand Down