Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/epp/scheduling/framework/scheduler_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,21 @@ func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, c
}

func (p *SchedulerProfile) runFilterPlugins(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, pods []types.Pod) []types.Pod {
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
logger := log.FromContext(ctx)
filteredPods := pods
loggerDebug.Info("Before running filter plugins", "pods", filteredPods)
logger.V(logutil.DEBUG).Info("Before running filter plugins", "pods", filteredPods)

for _, filter := range p.filters {
loggerDebug.Info("Running filter plugin", "plugin", filter.TypedName())
logger.V(logutil.VERBOSE).Info("Running filter plugin", "plugin", filter.TypedName())
before := time.Now()
filteredPods = filter.Filter(ctx, cycleState, request, filteredPods)
metrics.RecordPluginProcessingLatency(FilterExtensionPoint, filter.TypedName().Type, filter.TypedName().Name, time.Since(before))
loggerDebug.Info("Completed running filter plugin successfully", "plugin", filter.TypedName(), "pods", filteredPods)
logger.V(logutil.DEBUG).Info("Completed running filter plugin successfully", "plugin", filter.TypedName(), "pods", filteredPods)
if len(filteredPods) == 0 {
break
}
}
loggerDebug.Info("Completed running filter plugins successfully")
logger.V(logutil.VERBOSE).Info("Completed running filter plugins successfully")

return filteredPods
}
Expand All @@ -157,7 +157,7 @@ func (p *SchedulerProfile) runScorerPlugins(ctx context.Context, request *types.
}
// Iterate through each scorer in the chain and accumulate the weighted scores.
for _, scorer := range p.scorers {
logger.V(logutil.DEBUG).Info("Running scorer plugin", "plugin", scorer.TypedName())
logger.V(logutil.VERBOSE).Info("Running scorer plugin", "plugin", scorer.TypedName())
before := time.Now()
scores := scorer.Score(ctx, cycleState, request, pods)
metrics.RecordPluginProcessingLatency(ScorerExtensionPoint, scorer.TypedName().Type, scorer.TypedName().Name, time.Since(before))
Expand All @@ -167,25 +167,25 @@ func (p *SchedulerProfile) runScorerPlugins(ctx context.Context, request *types.
}
logger.V(logutil.DEBUG).Info("Completed running scorer plugin successfully", "plugin", scorer.TypedName())
}
logger.V(logutil.DEBUG).Info("Completed running scorer plugins successfully")
logger.V(logutil.VERBOSE).Info("Completed running scorer plugins successfully")

return weightedScorePerPod
}

func (p *SchedulerProfile) runPickerPlugin(ctx context.Context, cycleState *types.CycleState, weightedScorePerPod map[types.Pod]float64) *types.ProfileRunResult {
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
logger := log.FromContext(ctx)
scoredPods := make([]*types.ScoredPod, len(weightedScorePerPod))
i := 0
for pod, score := range weightedScorePerPod {
scoredPods[i] = &types.ScoredPod{Pod: pod, Score: score}
i++
}

loggerDebug.Info("Running picker plugin", "plugin", p.picker.TypedName(), "pods-weighted-score", weightedScorePerPod)
logger.V(logutil.VERBOSE).Info("Running picker plugin", "plugin", p.picker.TypedName())
logger.V(logutil.DEBUG).Info("Candidate pods for picking", "pods-weighted-score", scoredPods)
before := time.Now()
result := p.picker.Pick(ctx, cycleState, scoredPods)
metrics.RecordPluginProcessingLatency(PickerExtensionPoint, p.picker.TypedName().Type, p.picker.TypedName().Name, time.Since(before))
loggerDebug.Info("Completed running picker plugin successfully", "plugin", p.picker.TypedName(), "result", result)
logger.V(logutil.DEBUG).Info("Completed running picker plugin successfully", "plugin", p.picker.TypedName(), "result", result)

return result
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ type Scheduler struct {

// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) {
logger := log.FromContext(ctx).WithValues("requestId", request.RequestId, "targetModel", request.TargetModel)
loggerDebug := logger.V(logutil.DEBUG)
loggerVerbose := log.FromContext(ctx).V(logutil.VERBOSE)

scheduleStart := time.Now()
defer func() {
Expand All @@ -57,23 +56,23 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can
cycleState := types.NewCycleState()

for { // get the next set of profiles to run iteratively based on the request and the previous execution results
loggerDebug.Info("Running profile handler, Pick profiles", "plugin", s.profileHandler.TypedName())
loggerVerbose.Info("Running profile handler, Pick profiles", "plugin", s.profileHandler.TypedName())
before := time.Now()
profiles := s.profileHandler.Pick(ctx, cycleState, request, s.profiles, profileRunResults)
metrics.RecordPluginProcessingLatency(framework.ProfilePickerExtensionPoint, s.profileHandler.TypedName().Type, s.profileHandler.TypedName().Name, time.Since(before))
loggerDebug.Info("Completed running profile handler Pick profiles successfully", "plugin", s.profileHandler.TypedName(), "result", profiles)
loggerVerbose.Info("Completed running profile handler Pick profiles successfully", "plugin", s.profileHandler.TypedName(), "result", profiles)
if len(profiles) == 0 { // profile picker didn't pick any profile to run
break
}

for name, profile := range profiles {
loggerDebug.Info("Running scheduler profile", "name", name)
loggerVerbose.Info("Running scheduler profile", "profile", name)
// run the selected profiles and collect results (current code runs all profiles)
profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods)
if err != nil {
loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
loggerVerbose.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
} else {
loggerDebug.Info("Completed running scheduler profile succuessfully", "name", name)
loggerVerbose.Info("Completed running scheduler profile succuessfully", "profile", name)
}

profileRunResults[name] = profileRunResult // if profile failed to run, the run result is nil
Expand All @@ -84,11 +83,11 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can
return nil, fmt.Errorf("failed to run any scheduler profile for request %s", request.RequestId)
}

loggerDebug.Info("Running profile handler, ProcessResults", "plugin", s.profileHandler.TypedName())
loggerVerbose.Info("Running profile handler, ProcessResults", "plugin", s.profileHandler.TypedName())
before := time.Now()
result, err := s.profileHandler.ProcessResults(ctx, cycleState, request, profileRunResults)
metrics.RecordPluginProcessingLatency(framework.ProcessProfilesResultsExtensionPoint, s.profileHandler.TypedName().Type, s.profileHandler.TypedName().Name, time.Since(before))
loggerDebug.Info("Completed running profile handler ProcessResults successfully", "plugin", s.profileHandler.TypedName())
loggerVerbose.Info("Completed running profile handler ProcessResults successfully", "plugin", s.profileHandler.TypedName())

return result, err
}